diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 858a571459876..ce0114368b563 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -62,7 +62,7 @@ import org.apache.kafka.coordinator.share.ShareCoordinator import org.apache.kafka.metadata.{ConfigRepository, MetadataCache} import org.apache.kafka.server.{ApiVersionManager, ClientMetricsManager, DelegationTokenManager, ProcessRole} import org.apache.kafka.server.authorizer._ -import org.apache.kafka.server.common.{GroupVersion, RequestLocal, TransactionVersion} +import org.apache.kafka.server.common.{GroupVersion, RequestLocal, StreamsVersion, TransactionVersion} import org.apache.kafka.server.config.DelegationTokenManagerConfigs import org.apache.kafka.server.share.context.ShareFetchContext import org.apache.kafka.server.share.{ErroneousAndValidPartitionData, SharePartitionKey} @@ -2633,11 +2633,15 @@ class KafkaApis(val requestChannel: RequestChannel, } } } + } + private def streamsVersion(): StreamsVersion = { + StreamsVersion.fromFeatureLevel(metadataCache.features.finalizedFeatures.getOrDefault(StreamsVersion.FEATURE_NAME, 0.toShort)) } private def isStreamsGroupProtocolEnabled: Boolean = { - config.groupCoordinatorRebalanceProtocols.contains(Group.GroupType.STREAMS) + config.groupCoordinatorRebalanceProtocols.contains(Group.GroupType.STREAMS) && + streamsVersion().streamsGroupSupported } def handleStreamsGroupHeartbeat(request: RequestChannel.Request): CompletableFuture[Unit] = { diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index b982e72e141fa..7f4b37cd61c0b 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -379,10 +379,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) if (!protocols.contains(GroupType.CLASSIC)) { throw new ConfigException(s"Disabling the '${GroupType.CLASSIC}' protocol is not supported.") } - if (protocols.contains(GroupType.STREAMS)) { - warn(s"Streams groups and the new '${GroupType.STREAMS}' rebalance protocol are enabled. " + - "This is part of the early access of KIP-1071 and MUST NOT be used in production.") - } protocols } diff --git a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala index 88d25b65d934c..811e1d92c9bb3 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala @@ -25,7 +25,7 @@ import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse, RequestUtils} import org.apache.kafka.common.test.ClusterInstance import org.apache.kafka.common.utils.Utils -import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, GroupVersion, MetadataVersion, ShareVersion, TransactionVersion} +import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, GroupVersion, MetadataVersion, ShareVersion, StreamsVersion, TransactionVersion} import org.apache.kafka.test.TestUtils import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Tag @@ -64,11 +64,11 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) { apiVersion: Short = ApiKeys.API_VERSIONS.latestVersion ): Unit = { if (apiVersion >= 3) { - assertEquals(5, apiVersionsResponse.data().finalizedFeatures().size()) + assertEquals(6, apiVersionsResponse.data().finalizedFeatures().size()) assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).minVersionLevel()) assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).maxVersionLevel()) - assertEquals(6, apiVersionsResponse.data().supportedFeatures().size()) + assertEquals(7, apiVersionsResponse.data().supportedFeatures().size()) assertEquals(MetadataVersion.MINIMUM_VERSION.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(MetadataVersion.FEATURE_NAME).minVersion()) if (apiVersion < 4) { assertEquals(1, apiVersionsResponse.data().supportedFeatures().find("kraft.version").minVersion()) @@ -88,6 +88,9 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) { assertEquals(0, apiVersionsResponse.data().supportedFeatures().find(ShareVersion.FEATURE_NAME).minVersion()) assertEquals(ShareVersion.SV_1.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(ShareVersion.FEATURE_NAME).maxVersion()) + + assertEquals(0, apiVersionsResponse.data().supportedFeatures().find(StreamsVersion.FEATURE_NAME).minVersion()) + assertEquals(StreamsVersion.SV_1.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(StreamsVersion.FEATURE_NAME).maxVersion()) } val expectedApis = if (cluster.controllerListenerName().toScala.contains(listenerName)) { ApiVersionsResponse.collectApis( diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index e3ffce710a6b3..1f61cd9d989da 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -88,7 +88,7 @@ import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.security.authorizer.AclEntry import org.apache.kafka.server.{ClientMetricsManager, SimpleApiVersionManager} import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer} -import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures, GroupVersion, KRaftVersion, MetadataVersion, RequestLocal, TransactionVersion} +import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures, GroupVersion, KRaftVersion, MetadataVersion, RequestLocal, StreamsVersion, TransactionVersion} import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs} import org.apache.kafka.server.metrics.ClientMetricsTestUtils import org.apache.kafka.server.share.{CachedSharePartition, ErroneousAndValidPartitionData, SharePartitionKey} @@ -9960,7 +9960,11 @@ class KafkaApisTest extends Logging { @Test def testStreamsGroupHeartbeatRequest(): Unit = { + val features = mock(classOf[FinalizedFeatures]) + when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort)) + metadataCache = mock(classOf[KRaftMetadataCache]) + when(metadataCache.features()).thenReturn(features) val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group") @@ -9971,9 +9975,7 @@ class KafkaApisTest extends Logging { requestChannelRequest.context, streamsGroupHeartbeatRequest )).thenReturn(future) - kafkaApis = createKafkaApis( - overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams") - ) + kafkaApis = createKafkaApis() kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) val streamsGroupHeartbeatResponse = new StreamsGroupHeartbeatResponseData() @@ -9986,7 +9988,12 @@ class KafkaApisTest extends Logging { @Test def testStreamsGroupHeartbeatRequestWithAuthorizedTopology(): Unit = { + val features = mock(classOf[FinalizedFeatures]) + when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort)) + metadataCache = mock(classOf[KRaftMetadataCache]) + when(metadataCache.features()).thenReturn(features) + val groupId = "group" val fooTopicName = "foo" val barTopicName = "bar" @@ -10037,8 +10044,7 @@ class KafkaApisTest extends Logging { streamsGroupHeartbeatRequest )).thenReturn(future) kafkaApis = createKafkaApis( - authorizer = Some(authorizer), - overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams") + authorizer = Some(authorizer) ) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) @@ -10052,7 +10058,11 @@ class KafkaApisTest extends Logging { @Test def testStreamsGroupHeartbeatRequestFutureFailed(): Unit = { + val features = mock(classOf[FinalizedFeatures]) + when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort)) + metadataCache = mock(classOf[KRaftMetadataCache]) + when(metadataCache.features()).thenReturn(features) val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group") @@ -10063,9 +10073,7 @@ class KafkaApisTest extends Logging { requestChannelRequest.context, streamsGroupHeartbeatRequest )).thenReturn(future) - kafkaApis = createKafkaApis( - overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams") - ) + kafkaApis = createKafkaApis() kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception) @@ -10075,7 +10083,11 @@ class KafkaApisTest extends Logging { @Test def testStreamsGroupHeartbeatRequestGroupAuthorizationFailed(): Unit = { + val features = mock(classOf[FinalizedFeatures]) + when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort)) + metadataCache = mock(classOf[KRaftMetadataCache]) + when(metadataCache.features()).thenReturn(features) val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group") @@ -10085,8 +10097,7 @@ class KafkaApisTest extends Logging { when(authorizer.authorize(any[RequestContext], any[util.List[Action]])) .thenReturn(Seq(AuthorizationResult.DENIED).asJava) kafkaApis = createKafkaApis( - authorizer = Some(authorizer), - overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams") + authorizer = Some(authorizer) ) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) @@ -10096,7 +10107,12 @@ class KafkaApisTest extends Logging { @Test def testStreamsGroupHeartbeatRequestTopicAuthorizationFailed(): Unit = { + val features = mock(classOf[FinalizedFeatures]) + when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort)) + metadataCache = mock(classOf[KRaftMetadataCache]) + when(metadataCache.features()).thenReturn(features) + val groupId = "group" val fooTopicName = "foo" val barTopicName = "bar" @@ -10137,8 +10153,7 @@ class KafkaApisTest extends Logging { } kafkaApis = createKafkaApis( - authorizer = Some(authorizer), - overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams") + authorizer = Some(authorizer) ) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) @@ -10147,7 +10162,7 @@ class KafkaApisTest extends Logging { } @Test - def testStreamsGroupHeartbeatRequestProtocolDisabled(): Unit = { + def testStreamsGroupHeartbeatRequestProtocolDisabledViaConfig(): Unit = { metadataCache = mock(classOf[KRaftMetadataCache]) val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group") @@ -10163,9 +10178,32 @@ class KafkaApisTest extends Logging { assertEquals(Errors.UNSUPPORTED_VERSION.code, response.data.errorCode) } + @Test + def testStreamsGroupHeartbeatRequestProtocolDisabledViaFeature(): Unit = { + val features = mock(classOf[FinalizedFeatures]) + when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 0.toShort)) + + metadataCache = mock(classOf[KRaftMetadataCache]) + when(metadataCache.features()).thenReturn(features) + + val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group") + + val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, true).build()) + + kafkaApis = createKafkaApis() + kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) + + val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest) + assertEquals(Errors.UNSUPPORTED_VERSION.code, response.data.errorCode) + } + @Test def testStreamsGroupHeartbeatRequestInvalidTopicNames(): Unit = { + val features = mock(classOf[FinalizedFeatures]) + when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort)) + metadataCache = mock(classOf[KRaftMetadataCache]) + when(metadataCache.features()).thenReturn(features) val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group").setTopology( new StreamsGroupHeartbeatRequestData.Topology() @@ -10182,9 +10220,7 @@ class KafkaApisTest extends Logging { val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, true).build()) - kafkaApis = createKafkaApis( - overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams") - ) + kafkaApis = createKafkaApis() kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest) @@ -10194,7 +10230,11 @@ class KafkaApisTest extends Logging { @Test def testStreamsGroupHeartbeatRequestInternalTopicNames(): Unit = { + val features = mock(classOf[FinalizedFeatures]) + when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort)) + metadataCache = mock(classOf[KRaftMetadataCache]) + when(metadataCache.features()).thenReturn(features) val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group").setTopology( new StreamsGroupHeartbeatRequestData.Topology() @@ -10210,9 +10250,7 @@ class KafkaApisTest extends Logging { val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, true).build()) - kafkaApis = createKafkaApis( - overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams") - ) + kafkaApis = createKafkaApis() kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest) @@ -10222,7 +10260,11 @@ class KafkaApisTest extends Logging { @Test def testStreamsGroupHeartbeatRequestWithInternalTopicsToCreate(): Unit = { + val features = mock(classOf[FinalizedFeatures]) + when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort)) + metadataCache = mock(classOf[KRaftMetadataCache]) + when(metadataCache.features()).thenReturn(features) val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group"); @@ -10234,9 +10276,7 @@ class KafkaApisTest extends Logging { streamsGroupHeartbeatRequest )).thenReturn(future) - kafkaApis = createKafkaApis( - overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams") - ) + kafkaApis = createKafkaApis() kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) val missingTopics = Map("test" -> new CreatableTopic()) @@ -10251,7 +10291,11 @@ class KafkaApisTest extends Logging { @Test def testStreamsGroupHeartbeatRequestWithInternalTopicsToCreateMissingCreateACL(): Unit = { + val features = mock(classOf[FinalizedFeatures]) + when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort)) + metadataCache = mock(classOf[KRaftMetadataCache]) + when(metadataCache.features()).thenReturn(features) val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group"); @@ -10277,8 +10321,7 @@ class KafkaApisTest extends Logging { }.asJava }) kafkaApis = createKafkaApis( - authorizer = Some(authorizer), - overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams") + authorizer = Some(authorizer) ) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) @@ -10472,7 +10515,12 @@ class KafkaApisTest extends Logging { @ParameterizedTest @ValueSource(booleans = Array(true, false)) def testStreamsGroupDescribe(includeAuthorizedOperations: Boolean): Unit = { + val features = mock(classOf[FinalizedFeatures]) + when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort)) + metadataCache = mock(classOf[KRaftMetadataCache]) + when(metadataCache.features()).thenReturn(features) + val fooTopicName = "foo" val barTopicName = "bar" @@ -10487,9 +10535,7 @@ class KafkaApisTest extends Logging { any[RequestContext], any[util.List[String]] )).thenReturn(future) - kafkaApis = createKafkaApis( - overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams") - ) + kafkaApis = createKafkaApis() kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) val subtopology0 = new StreamsGroupDescribeResponseData.Subtopology() @@ -10580,7 +10626,11 @@ class KafkaApisTest extends Logging { @Test def testStreamsGroupDescribeAuthorizationFailed(): Unit = { + val features = mock(classOf[FinalizedFeatures]) + when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort)) + metadataCache = mock(classOf[KRaftMetadataCache]) + when(metadataCache.features()).thenReturn(features) val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData() streamsGroupDescribeRequestData.groupIds.add("group-id") @@ -10597,8 +10647,7 @@ class KafkaApisTest extends Logging { )).thenReturn(future) future.complete(List().asJava) kafkaApis = createKafkaApis( - authorizer = Some(authorizer), - overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams") + authorizer = Some(authorizer) ) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) @@ -10608,7 +10657,11 @@ class KafkaApisTest extends Logging { @Test def testStreamsGroupDescribeFutureFailed(): Unit = { + val features = mock(classOf[FinalizedFeatures]) + when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort)) + metadataCache = mock(classOf[KRaftMetadataCache]) + when(metadataCache.features()).thenReturn(features) val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData() streamsGroupDescribeRequestData.groupIds.add("group-id") @@ -10619,9 +10672,7 @@ class KafkaApisTest extends Logging { any[RequestContext], any[util.List[String]] )).thenReturn(future) - kafkaApis = createKafkaApis( - overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams") - ) + kafkaApis = createKafkaApis() kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception) @@ -10636,7 +10687,11 @@ class KafkaApisTest extends Logging { val barTopicName = "bar" val errorMessage = "The described group uses topics that the client is not authorized to describe." + val features = mock(classOf[FinalizedFeatures]) + when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME, 1.toShort)) + metadataCache = mock(classOf[KRaftMetadataCache]) + when(metadataCache.features()).thenReturn(features) val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData() @@ -10668,8 +10723,7 @@ class KafkaApisTest extends Logging { any[util.List[String]] )).thenReturn(future) kafkaApis = createKafkaApis( - authorizer = Some(authorizer), - overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams") + authorizer = Some(authorizer) ) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala index 101b8f43bc48a..42e262a6858fa 100644 --- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala +++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala @@ -325,7 +325,7 @@ Found problem: properties.putAll(defaultStaticQuorumProperties) properties.setProperty("log.dirs", availableDirs.mkString(",")) assertEquals("Unsupported feature: non.existent.feature. Supported features are: " + - "eligible.leader.replicas.version, group.version, kraft.version, share.version, transaction.version", + "eligible.leader.replicas.version, group.version, kraft.version, share.version, streams.version, transaction.version", assertThrows(classOf[FormatterException], () => runFormatCommand(new ByteArrayOutputStream(), properties, Seq("--feature", "non.existent.feature=20"))).getMessage) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java index 1221f937e8a07..629bc895d3c21 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java @@ -63,7 +63,8 @@ public class GroupCoordinatorConfig { "The " + Group.GroupType.STREAMS + " rebalance protocol is in early access and therefore must not be used in production."; public static final List GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = List.of( Group.GroupType.CLASSIC.toString(), - Group.GroupType.CONSUMER.toString()); + Group.GroupType.CONSUMER.toString(), + Group.GroupType.STREAMS.toString()); public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG = "group.coordinator.append.linger.ms"; public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_DOC = "The duration in milliseconds that the coordinator will " + "wait for writes to accumulate before flushing them to disk. Increasing this value improves write efficiency and batch size, " + diff --git a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java index eac143209dd6b..483defcc6af4e 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java @@ -396,7 +396,7 @@ public void testCannotDowngradeBeforeMinimumKraftVersion() { build(); manager.replay(new FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel())); assertEquals(ControllerResult.of(List.of(), new ApiError(Errors.INVALID_UPDATE_VERSION, - "Invalid update version 6 for feature metadata.version. Local controller 0 only supports versions 7-28")), + "Invalid update version 6 for feature metadata.version. Local controller 0 only supports versions 7-29")), manager.updateFeatures( Map.of(MetadataVersion.FEATURE_NAME, MetadataVersionTestUtils.IBP_3_3_IV2_FEATURE_LEVEL), Map.of(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE), diff --git a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java index 880bea07a9e7e..ebb6f5869690f 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java @@ -368,7 +368,7 @@ public void testInvalidFeatureFlag() throws Exception { formatter1.formatter.setFeatureLevel("nonexistent.feature", (short) 1); assertEquals("Unsupported feature: nonexistent.feature. Supported features " + "are: eligible.leader.replicas.version, group.version, kraft.version, " + - "share.version, test.feature.version, transaction.version", + "share.version, streams.version, test.feature.version, transaction.version", assertThrows(FormatterException.class, () -> formatter1.formatter.run()). getMessage()); diff --git a/server-common/src/main/java/org/apache/kafka/server/common/Feature.java b/server-common/src/main/java/org/apache/kafka/server/common/Feature.java index 25bb654577c7f..60b538562251f 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/Feature.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/Feature.java @@ -48,6 +48,7 @@ public enum Feature { GROUP_VERSION(GroupVersion.FEATURE_NAME, GroupVersion.values(), GroupVersion.LATEST_PRODUCTION), ELIGIBLE_LEADER_REPLICAS_VERSION(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.values(), EligibleLeaderReplicasVersion.LATEST_PRODUCTION), SHARE_VERSION(ShareVersion.FEATURE_NAME, ShareVersion.values(), ShareVersion.LATEST_PRODUCTION), + STREAMS_VERSION(StreamsVersion.FEATURE_NAME, StreamsVersion.values(), StreamsVersion.LATEST_PRODUCTION), /** * Features defined only for unit tests and are not used in production. diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java index 7e64fa648f522..93be50cbb1375 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java @@ -127,7 +127,15 @@ public enum MetadataVersion { // *** SHARE GROUPS BECOME PRODUCTION-READY IN THE FUTURE. ITS DEFINITION ALLOWS A SHARE *** // *** GROUPS FEATURE TO BE DEFINED IN 4.1 BUT TURNED OFF BY DEFAULT, ABLE TO BE TURNED ON *** // *** DYNAMICALLY TO TRY OUT THE PREVIEW CAPABILITY. *** - IBP_4_2_IV0(28, "4.2", "IV0", false); + IBP_4_2_IV0(28, "4.2", "IV0", false), + + // Enables "streams" groups by default for new clusters (KIP-1071). + // + // *** THIS IS A PLACEHOLDER UNSTABLE VERSION WHICH IS USED TO DEFINE THE POINT AT WHICH *** + // *** STREAMS GROUPS BECOME PRODUCTION-READY IN THE FUTURE. ITS DEFINITION ALLOWS A STREAMS *** + // *** GROUPS FEATURE TO BE DEFINED IN 4.1 BUT TURNED OFF BY DEFAULT, ABLE TO BE TURNED ON *** + // *** DYNAMICALLY TO TRY OUT THE EARLY ACCESS CAPABILITY. *** + IBP_4_2_IV1(29, "4.2", "IV1", false); // NOTES when adding a new version: // Update the default version in @ClusterTest annotation to point to the latest version diff --git a/server-common/src/main/java/org/apache/kafka/server/common/StreamsVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/StreamsVersion.java new file mode 100644 index 0000000000000..cf910c660dc5a --- /dev/null +++ b/server-common/src/main/java/org/apache/kafka/server/common/StreamsVersion.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Map; + +public enum StreamsVersion implements FeatureVersion { + + // Version 0 keeps "streams" groups disabled (KIP-1071). + SV_0(0, MetadataVersion.MINIMUM_VERSION, Map.of()), + + // Version 1 enables "streams" groups (KIP-1071). + // Using metadata version IBP_4_2_IV1 disables it by default in AK 4.1 release, and enables it by default in AK 4.2 release. + // - in AK 4.1, this can be enabled as "early access [unstable]" + // - in AK 4.2, it is planned to go GA (cf `LATEST_PRODUCTION`) + SV_1(1, MetadataVersion.IBP_4_2_IV1, Map.of()); + + public static final String FEATURE_NAME = "streams.version"; + + // Mark "streams" group as unstable in AK 4.1 release + // Needs to be updated to SV_1 in AK 4.2, to mark as stable + public static final StreamsVersion LATEST_PRODUCTION = SV_0; + + private final short featureLevel; + private final MetadataVersion bootstrapMetadataVersion; + private final Map dependencies; + + StreamsVersion( + int featureLevel, + MetadataVersion bootstrapMetadataVersion, + Map dependencies + ) { + this.featureLevel = (short) featureLevel; + this.bootstrapMetadataVersion = bootstrapMetadataVersion; + this.dependencies = dependencies; + } + + @Override + public short featureLevel() { + return featureLevel; + } + + @Override + public String featureName() { + return FEATURE_NAME; + } + + @Override + public MetadataVersion bootstrapMetadataVersion() { + return bootstrapMetadataVersion; + } + + @Override + public Map dependencies() { + return dependencies; + } + + public boolean streamsGroupSupported() { + return featureLevel >= SV_1.featureLevel; + } + + public static StreamsVersion fromFeatureLevel(short version) { + return switch (version) { + case 0 -> SV_0; + case 1 -> SV_1; + default -> throw new RuntimeException("Unknown streams feature level: " + (int) version); + }; + } +} diff --git a/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java b/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java index 31ce9c596eea6..85c09248c1734 100644 --- a/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java +++ b/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java @@ -30,6 +30,7 @@ import static org.apache.kafka.server.common.Feature.ELIGIBLE_LEADER_REPLICAS_VERSION; import static org.apache.kafka.server.common.Feature.GROUP_VERSION; import static org.apache.kafka.server.common.Feature.SHARE_VERSION; +import static org.apache.kafka.server.common.Feature.STREAMS_VERSION; import static org.apache.kafka.server.common.Feature.TRANSACTION_VERSION; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -99,6 +100,7 @@ public void testDefaultFinalizedFeatures() { GROUP_VERSION.featureName(), GROUP_VERSION.latestTesting(), ELIGIBLE_LEADER_REPLICAS_VERSION.featureName(), ELIGIBLE_LEADER_REPLICAS_VERSION.latestTesting(), SHARE_VERSION.featureName(), SHARE_VERSION.latestTesting(), + STREAMS_VERSION.featureName(), STREAMS_VERSION.latestTesting(), "kraft.version", (short) 0, "test_feature_1", (short) 4, "test_feature_2", (short) 3, diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java index 78eb64a9a268e..31e39a4c8b7d6 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java @@ -75,7 +75,7 @@ @Timeout(600) @Tag("integration") public class InternalTopicIntegrationTest { - public static final EmbeddedKafkaCluster CLUSTER = EmbeddedKafkaCluster.withStreamsRebalanceProtocol(1); + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); @BeforeAll public static void startCluster() throws IOException, InterruptedException { diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java index 604fdd9b6a147..c40b3433a91a7 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java @@ -51,7 +51,7 @@ @Timeout(600) @Tag("integration") public class SmokeTestDriverIntegrationTest { - public static final EmbeddedKafkaCluster CLUSTER = EmbeddedKafkaCluster.withStreamsRebalanceProtocol(3); + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3); public TestInfo testInfo; @BeforeAll diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java index eb233e8210b7d..ec48b8b36349c 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java @@ -57,7 +57,7 @@ public class StandbyTaskCreationIntegrationTest { private static final int NUM_BROKERS = 1; - public static final EmbeddedKafkaCluster CLUSTER = EmbeddedKafkaCluster.withStreamsRebalanceProtocol(NUM_BROKERS); + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); private String safeTestName; diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java index 82100a772939e..6f29b7b81f515 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java @@ -89,7 +89,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest { private static final long NOW = Instant.now().toEpochMilli(); - public static final EmbeddedKafkaCluster CLUSTER = EmbeddedKafkaCluster.withStreamsRebalanceProtocol(1); + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); @BeforeAll public static void startCluster() throws IOException { diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java index 6965135767f32..f425f8365ee57 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java @@ -143,16 +143,6 @@ public EmbeddedKafkaCluster(final int numBrokers, this.time = new MockTime(mockTimeMillisStart, mockTimeNanoStart); } - public static EmbeddedKafkaCluster withStreamsRebalanceProtocol(final int numBrokers) { - return withStreamsRebalanceProtocol(numBrokers, new Properties()); - } - - public static EmbeddedKafkaCluster withStreamsRebalanceProtocol(final int numBrokers, final Properties props) { - props.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer,streams"); - props.setProperty(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true"); - return new EmbeddedKafkaCluster(numBrokers, props); - } - public void start() { try { cluster.format(); diff --git a/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java b/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java index 097aac3094cb4..f81f2739907be 100644 --- a/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java +++ b/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java @@ -52,7 +52,7 @@ String brokerListener() default DEFAULT_BROKER_LISTENER_NAME; SecurityProtocol controllerSecurityProtocol() default SecurityProtocol.PLAINTEXT; String controllerListener() default DEFAULT_CONTROLLER_LISTENER_NAME; - MetadataVersion metadataVersion() default MetadataVersion.IBP_4_2_IV0; + MetadataVersion metadataVersion() default MetadataVersion.IBP_4_2_IV1; ClusterConfigProperty[] serverProperties() default {}; // users can add tags that they want to display in test String[] tags() default {}; diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java index a1ef4ff2e3929..f1c68eb0d3980 100644 --- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java @@ -64,11 +64,13 @@ public void testDescribeWithKRaft(ClusterInstance cluster) { assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" + "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(2))); assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.3-IV3\t" + - "SupportedMaxVersion: 4.2-IV0\tFinalizedVersionLevel: 3.3-IV3\t", outputWithoutEpoch(features.get(3))); + "SupportedMaxVersion: 4.2-IV1\tFinalizedVersionLevel: 3.3-IV3\t", outputWithoutEpoch(features.get(3))); assertEquals("Feature: share.version\tSupportedMinVersion: 0\t" + "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(4))); + assertEquals("Feature: streams.version\tSupportedMinVersion: 0\t" + + "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(5))); assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" + - "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(5))); + "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(6))); } // Use the first MetadataVersion that supports KIP-919 @@ -88,11 +90,13 @@ public void testDescribeWithKRaftAndBootstrapControllers(ClusterInstance cluster assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" + "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(2))); assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.3-IV3\t" + - "SupportedMaxVersion: 4.2-IV0\tFinalizedVersionLevel: 3.7-IV0\t", outputWithoutEpoch(features.get(3))); + "SupportedMaxVersion: 4.2-IV1\tFinalizedVersionLevel: 3.7-IV0\t", outputWithoutEpoch(features.get(3))); assertEquals("Feature: share.version\tSupportedMinVersion: 0\t" + "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(4))); + assertEquals("Feature: streams.version\tSupportedMinVersion: 0\t" + + "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(5))); assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" + - "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(5))); + "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(6))); } @ClusterTest(types = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_3_3_IV3) @@ -118,7 +122,7 @@ public void testDowngradeMetadataVersionWithKRaft(ClusterInstance cluster) { ); // Change expected message to reflect possible MetadataVersion range 1-N (N increases when adding a new version) assertEquals("Could not disable metadata.version. The update failed for all features since the following " + - "feature had an error: Invalid update version 0 for feature metadata.version. Local controller 3000 only supports versions 7-28", commandOutput); + "feature had an error: Invalid update version 0 for feature metadata.version. Local controller 3000 only supports versions 7-29", commandOutput); commandOutput = ToolsTestUtils.captureStandardOut(() -> assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), @@ -182,6 +186,7 @@ public void testDowngradeWithReleaseVersion(ClusterInstance cluster) { "kraft.version was downgraded to 0.\n" + "metadata.version was downgraded to 18.\n" + "share.version was downgraded to 0.\n" + + "streams.version was downgraded to 0.\n" + "transaction.version was downgraded to 0.", commandOutput); }