Skip to content

Commit

Permalink
KAFKA-16968: Introduce 3.8-IV0, 3.9-IV0, 3.9-IV1
Browse files Browse the repository at this point in the history
Create 3 new metadata versions:

- 3.8-IV0, for the upcoming 3.8 release.
- 3.9-IV0, to add support for KIP-1005.
- 3.9-IV1, as the new release vehicle for KIP-966.

Create ListOffsetRequest v9, which will be used in 3.9-IV0 to support KIP-1005. v9 is currently an unstable API version.

Reviewers: Jun Rao <junrao@gmail.com>, Justine Olshan <jolshan@confluent.io>
  • Loading branch information
cmccabe authored Jun 27, 2024
1 parent dc7c9ad commit ebaa108
Show file tree
Hide file tree
Showing 18 changed files with 97 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@
// Version 7 enables listing offsets by max timestamp (KIP-734).
//
// Version 8 enables listing offsets by local log start offset (KIP-405).
"validVersions": "0-8",
//
// Version 9 enables listing offsets by last tiered offset (KIP-1005).
"validVersions": "0-9",
"deprecatedVersions": "0",
"flexibleVersions": "6+",
"latestVersionUnstable": true,
"fields": [
{ "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId",
"about": "The broker ID of the requester, or -1 if this request is being made by a normal consumer." },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
//
// Version 8 enables listing offsets by local log start offset.
// This is the earliest log start offset in the local log. (KIP-405).
"validVersions": "0-8",
//
// Version 9 enables listing offsets by last tiered offset (KIP-1005).
"validVersions": "0-9",
"flexibleVersions": "6+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", "ignorable": true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public void testNoAutoStart() {

@ClusterTest
public void testDefaults(ClusterInstance clusterInstance) {
Assertions.assertEquals(MetadataVersion.IBP_4_0_IV0, clusterInstance.config().metadataVersion());
Assertions.assertEquals(MetadataVersion.latestTesting(), clusterInstance.config().metadataVersion());
}

@ClusterTests({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ object ZkMigrationIntegrationTest {
MetadataVersion.IBP_3_7_IV2,
MetadataVersion.IBP_3_7_IV4,
MetadataVersion.IBP_3_8_IV0,
MetadataVersion.IBP_4_0_IV0
MetadataVersion.IBP_3_9_IV0,
MetadataVersion.IBP_3_9_IV1
// Note: ZK Migration is not supported in Apache Kafka 4.0 and beyond.
).map { mv =>
val serverProperties = new util.HashMap[String, String]()
serverProperties.put("inter.broker.listener.name", "EXTERNAL")
Expand Down Expand Up @@ -492,7 +494,7 @@ class ZkMigrationIntegrationTest {
}
}

@ClusterTest(types = Array(Type.ZK), brokers = 3, metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array(
@ClusterTest(types = Array(Type.ZK), brokers = 3, metadataVersion = MetadataVersion.IBP_3_9_IV1, serverProperties = Array(
new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"),
new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
Expand All @@ -514,7 +516,7 @@ class ZkMigrationIntegrationTest {
val clusterId = zkCluster.clusterId()
val kraftCluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setBootstrapMetadataVersion(MetadataVersion.IBP_3_8_IV0).
setBootstrapMetadataVersion(MetadataVersion.IBP_3_9_IV1).
setClusterId(Uuid.fromString(clusterId)).
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
Expand Down
17 changes: 10 additions & 7 deletions core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ object ApiVersionsRequestTest {
List(ClusterConfig.defaultBuilder()
.setTypes(java.util.Collections.singleton(Type.ZK))
.setServerProperties(serverProperties)
.setMetadataVersion(MetadataVersion.IBP_4_0_IV0)
.setMetadataVersion(MetadataVersion.latestTesting())
.build()).asJava
}

Expand All @@ -67,7 +67,7 @@ object ApiVersionsRequestTest {
serverProperties.put("unstable.feature.versions.enable", "false")
List(ClusterConfig.defaultBuilder()
.setTypes(java.util.Collections.singleton(Type.ZK))
.setMetadataVersion(MetadataVersion.IBP_3_7_IV4)
.setMetadataVersion(MetadataVersion.latestProduction())
.build()).asJava
}

Expand All @@ -83,7 +83,7 @@ object ApiVersionsRequestTest {
class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) {

@ClusterTemplate("testApiVersionsRequestTemplate")
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), metadataVersion = MetadataVersion.IBP_4_0_IV0, serverProperties = Array(
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"),
new ClusterConfigProperty(key = "unstable.feature.versions.enable", value = "true")
))
Expand All @@ -108,7 +108,7 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio
def testApiVersionsRequestThroughControlPlaneListener(): Unit = {
val request = new ApiVersionsRequest.Builder().build()
val apiVersionsResponse = sendApiVersionsRequest(request, cluster.controlPlaneListenerName().get())
validateApiVersionsResponse(apiVersionsResponse, cluster.controlPlaneListenerName().get())
validateApiVersionsResponse(apiVersionsResponse, cluster.controlPlaneListenerName().get(), true)
}

@ClusterTest(types = Array(Type.KRAFT))
Expand All @@ -131,22 +131,25 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio
assertEquals(ApiKeys.API_VERSIONS.latestVersion(), apiVersion.maxVersion())
}

// Use the latest production MV for this test
@ClusterTemplate("testApiVersionsRequestValidationV0Template")
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), metadataVersion = MetadataVersion.IBP_3_7_IV4, serverProperties = Array(
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array(
new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"),
new ClusterConfigProperty(key = "unstable.feature.versions.enable", value = "false"),
))
def testApiVersionsRequestValidationV0(): Unit = {
val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0.asInstanceOf[Short])
val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, cluster.clientListener())
validateApiVersionsResponse(apiVersionsResponse, apiVersion = 0)
validateApiVersionsResponse(apiVersionsResponse, apiVersion = 0,
enableUnstableLastVersion = !"false".equals(
cluster.config().serverProperties().get("unstable.api.versions.enable")))
}

@ClusterTemplate("zkApiVersionsRequest")
def testApiVersionsRequestValidationV0ThroughControlPlaneListener(): Unit = {
val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0.asInstanceOf[Short])
val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, cluster.controlPlaneListenerName().get())
validateApiVersionsResponse(apiVersionsResponse, cluster.controlPlaneListenerName().get())
validateApiVersionsResponse(apiVersionsResponse, cluster.controlPlaneListenerName().get(), true)
}

@ClusterTest(types = Array(Type.KRAFT))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,27 @@ import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartit
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse}
import org.apache.kafka.common.{IsolationLevel, TopicPartition}
import org.apache.kafka.server.config.ServerConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource

import java.util.Optional
import java.util.{Optional, Properties}
import scala.collection.Seq
import scala.jdk.CollectionConverters._

class ListOffsetsRequestTest extends BaseRequestTest {

val topic = "topic"
val partition = new TopicPartition(topic, 0)

override def modifyConfigs(props: Seq[Properties]): Unit = {
super.modifyConfigs(props)
props.foreach { p =>
p.put(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true")
}
}

@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testListOffsetsErrorCodes(quorum: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ class ReplicationQuotasTest extends QuorumTestHarness {
features.add(new BrokerRegistrationRequestData.Feature()
.setName(MetadataVersion.FEATURE_NAME)
.setMinSupportedVersion(MetadataVersion.IBP_3_0_IV1.featureLevel())
.setMaxSupportedVersion(MetadataVersion.IBP_4_0_IV0.featureLevel()))
.setMaxSupportedVersion(MetadataVersion.latestTesting().featureLevel()))
controllerServer.controller.registerBroker(
ControllerRequestContextUtil.ANONYMOUS_CONTEXT,
new BrokerRegistrationRequestData()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVe
try {
val apiVersionsResponse = IntegrationTestUtils.sendAndReceive[ApiVersionsResponse](
new ApiVersionsRequest.Builder().build(0), socket)
validateApiVersionsResponse(apiVersionsResponse)
validateApiVersionsResponse(apiVersionsResponse,
enableUnstableLastVersion = !"false".equals(
cluster.config().serverProperties().get("unstable.api.versions.enable")))
sendSaslHandshakeRequestValidateResponse(socket)
} finally {
socket.close()
Expand Down Expand Up @@ -113,7 +115,9 @@ class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVe
assertEquals(Errors.UNSUPPORTED_VERSION.code, apiVersionsResponse.data.errorCode)
val apiVersionsResponse2 = IntegrationTestUtils.sendAndReceive[ApiVersionsResponse](
new ApiVersionsRequest.Builder().build(0), socket)
validateApiVersionsResponse(apiVersionsResponse2)
validateApiVersionsResponse(apiVersionsResponse2,
enableUnstableLastVersion = !"false".equals(
cluster.config().serverProperties().get("unstable.api.versions.enable")))
sendSaslHandshakeRequestValidateResponse(socket)
} finally {
socket.close()
Expand Down
1 change: 1 addition & 0 deletions core/src/test/scala/unit/kafka/utils/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ object TestUtils extends Logging {

val props = new Properties
props.put(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG, "true")
props.put(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true")
if (zkConnect == null) {
props.setProperty(KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_CONFIG, TimeUnit.MINUTES.toMillis(10).toString)
props.put(KRaftConfigs.NODE_ID_CONFIG, nodeId.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private static MetadataVersion metadataVersionForPartitionChangeRecordVersion(sh
case (short) 1:
return MetadataVersion.IBP_3_7_IV2;
case (short) 2:
return MetadataVersion.IBP_3_8_IV0;
return MetadataVersion.IBP_3_9_IV1;
default:
throw new RuntimeException("Unknown PartitionChangeRecord version " + version);
}
Expand Down Expand Up @@ -313,7 +313,7 @@ public void testNoLeaderEpochBumpIfNothingChanged(short version) {
* Test that shrinking the ISR doesn't increase the leader epoch in later MVs.
*/
@ParameterizedTest
@ValueSource(strings = {"3.6-IV0", "3.7-IV4"})
@ValueSource(strings = {"3.6-IV0", "3.7-IV2", "3.9-IV1"})
public void testNoLeaderEpochBumpOnIsrShrink(String metadataVersionString) {
MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
testTriggerLeaderEpochBumpIfNeeded(
Expand Down Expand Up @@ -342,7 +342,7 @@ public void testLeaderEpochBumpOnIsrShrink(String metadataVersionString) {
* Test that shrinking the ISR does increase the leader epoch in later MVs when ZK migration is on.
*/
@ParameterizedTest
@ValueSource(strings = {"3.6-IV0", "3.7-IV4"})
@ValueSource(strings = {"3.6-IV0", "3.7-IV2", "3.9-IV1"})
public void testLeaderEpochBumpOnIsrShrinkWithZkMigration(String metadataVersionString) {
MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
testTriggerLeaderEpochBumpIfNeeded(
Expand All @@ -358,7 +358,7 @@ public void testLeaderEpochBumpOnIsrShrinkWithZkMigration(String metadataVersion
* Test that expanding the ISR doesn't increase the leader epoch.
*/
@ParameterizedTest
@ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV4"})
@ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV2", "3.9-IV1"})
public void testNoLeaderEpochBumpOnIsrExpansion(String metadataVersionString) {
MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
testTriggerLeaderEpochBumpIfNeeded(
Expand All @@ -372,7 +372,7 @@ public void testNoLeaderEpochBumpOnIsrExpansion(String metadataVersionString) {
* Test that expanding the ISR doesn't increase the leader epoch during ZK migration.
*/
@ParameterizedTest
@ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV4"})
@ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV2", "3.9-IV1"})
public void testNoLeaderEpochBumpOnIsrExpansionDuringMigration(String metadataVersionString) {
MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
testTriggerLeaderEpochBumpIfNeeded(
Expand All @@ -389,7 +389,7 @@ public void testNoLeaderEpochBumpOnIsrExpansionDuringMigration(String metadataVe
* always results in a leader epoch increase.
*/
@ParameterizedTest
@ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV4"})
@ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV2", "3.9-IV1"})
public void testLeaderEpochBumpOnNewReplicaSetDisjoint(String metadataVersionString) {
MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
testTriggerLeaderEpochBumpIfNeeded(
Expand All @@ -403,7 +403,7 @@ public void testLeaderEpochBumpOnNewReplicaSetDisjoint(String metadataVersionStr
* cannot actually change the ISR, triggerLeaderEpochBumpForIsrShrinkIfNeeded does not engage.
*/
@ParameterizedTest
@ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV4"})
@ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV2"})
public void testNoLeaderEpochBumpOnEmptyTargetIsr(String metadataVersionString) {
MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
PartitionRegistration partition = new PartitionRegistration.Builder().
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public void testConfigurationOperations() throws Throwable {
) {
controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
new BrokerRegistrationRequestData().
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_4_0_IV0)).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.latestTesting())).
setBrokerId(0).
setLogDirs(Collections.singletonList(Uuid.fromString("iiaQjkRPQcuMULNII0MUeA"))).
setClusterId(logEnv.clusterId())).get();
Expand Down Expand Up @@ -241,7 +241,7 @@ public void testDelayedConfigurationOperations() throws Throwable {
) {
controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
new BrokerRegistrationRequestData().
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_4_0_IV0)).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.latestTesting())).
setBrokerId(0).
setLogDirs(Collections.singletonList(Uuid.fromString("sTbzRAMnTpahIyIPNjiLhw"))).
setClusterId(logEnv.clusterId())).get();
Expand Down Expand Up @@ -299,7 +299,7 @@ public void testFenceMultipleBrokers() throws Throwable {
new BrokerRegistrationRequestData().
setBrokerId(brokerId).
setClusterId(active.clusterId()).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_4_0_IV0)).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.latestTesting())).
setIncarnationId(Uuid.randomUuid()).
setListeners(listeners));
brokerEpochs.put(brokerId, reply.get().epoch());
Expand Down Expand Up @@ -381,7 +381,7 @@ public void testUncleanShutdownBroker() throws Throwable {
}).
setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)).

setBootstrapMetadata(BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_8_IV0, "test-provided bootstrap ELR enabled")).
setBootstrapMetadata(BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_9_IV1, "test-provided bootstrap ELR enabled")).
build()
) {
ListenerCollection listeners = new ListenerCollection();
Expand All @@ -395,7 +395,7 @@ public void testUncleanShutdownBroker() throws Throwable {
new BrokerRegistrationRequestData().
setBrokerId(brokerId).
setClusterId(active.clusterId()).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_8_IV0)).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_9_IV1)).
setIncarnationId(Uuid.randomUuid()).
setLogDirs(Collections.singletonList(Uuid.randomUuid())).
setListeners(listeners));
Expand Down Expand Up @@ -464,7 +464,7 @@ public void testUncleanShutdownBroker() throws Throwable {
new BrokerRegistrationRequestData().
setBrokerId(brokerToUncleanShutdown).
setClusterId(active.clusterId()).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_8_IV0)).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_9_IV1)).
setIncarnationId(Uuid.randomUuid()).
setLogDirs(Collections.singletonList(Uuid.randomUuid())).
setListeners(listeners)).get();
Expand All @@ -477,7 +477,7 @@ public void testUncleanShutdownBroker() throws Throwable {
new BrokerRegistrationRequestData().
setBrokerId(lastKnownElr[0]).
setClusterId(active.clusterId()).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_8_IV0)).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_9_IV1)).
setIncarnationId(Uuid.randomUuid()).
setLogDirs(Collections.singletonList(Uuid.randomUuid())).
setListeners(listeners)).get();
Expand Down Expand Up @@ -718,7 +718,7 @@ public void testUnregisterBroker() throws Throwable {
setBrokerId(0).
setClusterId(active.clusterId()).
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_4_0_IV0)).
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.latestTesting())).
setLogDirs(Collections.singletonList(Uuid.fromString("vBpaRsZVSaGsQT53wtYGtg"))).
setListeners(listeners));
assertEquals(5L, reply.get().epoch());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ private static Stream<Arguments> metadataVersionsForTestPartitionRegistration()
return Stream.of(
MetadataVersion.IBP_3_7_IV1,
MetadataVersion.IBP_3_7_IV2,
MetadataVersion.IBP_3_8_IV0
MetadataVersion.IBP_3_9_IV1
).map(mv -> Arguments.of(mv));
}

Expand Down Expand Up @@ -373,7 +373,7 @@ public void testPartitionRegistrationToRecord_ElrShouldBeNullIfEmpty() {
setPartitionEpoch(0);
List<UnwritableMetadataException> exceptions = new ArrayList<>();
ImageWriterOptions options = new ImageWriterOptions.Builder().
setMetadataVersion(MetadataVersion.IBP_3_8_IV0).
setMetadataVersion(MetadataVersion.IBP_3_9_IV1).
setLossHandler(exceptions::add).
build();
assertEquals(new ApiMessageAndVersion(expectRecord, (short) 2), partitionRegistration.toRecord(topicID, 0, options));
Expand Down
Loading

0 comments on commit ebaa108

Please sign in to comment.