Skip to content

Commit

Permalink
Merge branch 'trunk' into KAFKA-17182-reduce-fetch-session-eviction
Browse files Browse the repository at this point in the history
  • Loading branch information
kirktrue authored Nov 27, 2024
2 parents b25b85f + 5243fb9 commit 21ce1f2
Show file tree
Hide file tree
Showing 37 changed files with 470 additions and 573 deletions.
6 changes: 2 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ See our [web site](https://kafka.apache.org) for details on the project.

You need to have [Java](http://www.oracle.com/technetwork/java/javase/downloads/index.html) installed.

We build and test Apache Kafka with 11, 17 and 21. We set the `release` parameter in javac and scalac
to `11` to ensure the generated binaries are compatible with Java 11 or higher (independently of the Java version
used for compilation). Java 11 support for the broker and tools has been deprecated since Apache Kafka 3.7 and removal
of both is planned for Apache Kafka 4.0.([KIP-1013](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=284789510) for more details).
We build and test Apache Kafka with 17 and 23. The `release` parameter in javac and scalac is set to `11` for the clients
and streams modules, and `17` for the broker and tools, ensuring compatibility with their respective minimum Java versions.

Scala 2.13 is the only supported version in Apache Kafka.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,11 @@

import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;


/**
Expand All @@ -43,36 +39,6 @@ public class DescribeLogDirsResult {
this.futures = futures;
}

/**
* Return a map from brokerId to future which can be used to check the information of partitions on each individual broker.
* @deprecated Deprecated Since Kafka 2.7. Use {@link #descriptions()}.
*/
@Deprecated
public Map<Integer, KafkaFuture<Map<String, DescribeLogDirsResponse.LogDirInfo>>> values() {
return descriptions().entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> entry.getValue().thenApply(this::convertMapValues)));
}

@SuppressWarnings("deprecation")
private Map<String, DescribeLogDirsResponse.LogDirInfo> convertMapValues(Map<String, LogDirDescription> map) {
Stream<Map.Entry<String, LogDirDescription>> stream = map.entrySet().stream();
return stream.collect(Collectors.toMap(
Map.Entry::getKey,
infoEntry -> {
LogDirDescription logDir = infoEntry.getValue();
return new DescribeLogDirsResponse.LogDirInfo(logDir.error() == null ? Errors.NONE : Errors.forException(logDir.error()),
logDir.replicaInfos().entrySet().stream().collect(Collectors.toMap(
Map.Entry::getKey,
replicaEntry -> new DescribeLogDirsResponse.ReplicaInfo(
replicaEntry.getValue().size(),
replicaEntry.getValue().offsetLag(),
replicaEntry.getValue().isFuture())
)));
}));
}

/**
* Return a map from brokerId to future which can be used to check the information of partitions on each individual broker.
* The result of the future is a map from broker log directory path to a description of that log directory.
Expand All @@ -81,18 +47,6 @@ public Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> descriptions()
return futures;
}

/**
* Return a future which succeeds only if all the brokers have responded without error
* @deprecated Deprecated Since Kafka 2.7. Use {@link #allDescriptions()}.
*/
@Deprecated
public KafkaFuture<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> all() {
return allDescriptions().thenApply(map -> map.entrySet().stream().collect(Collectors.toMap(
Map.Entry::getKey,
entry -> convertMapValues(entry.getValue())
)));
}

/**
* Return a future which succeeds only if all the brokers have responded without error.
* The result of the future is a map from brokerId to a map from broker log directory path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,23 @@ public synchronized void subscribe(Pattern pattern) {
}

@Override
public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) {
throw new UnsupportedOperationException("Subscribe to RE2/J regular expression not supported in MockConsumer yet");
public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener listener) {
if (listener == null)
throw new IllegalArgumentException("RebalanceListener cannot be null");
subscribe(pattern, Optional.of(listener));
}

@Override
public void subscribe(SubscriptionPattern pattern) {
throw new UnsupportedOperationException("Subscribe to RE2/J regular expression not supported in MockConsumer yet");
subscribe(pattern, Optional.empty());
}

private void subscribe(SubscriptionPattern pattern, Optional<ConsumerRebalanceListener> listener) {
if (pattern == null || pattern.toString().isEmpty())
throw new IllegalArgumentException("Topic pattern cannot be " + (pattern == null ? "null" : "empty"));
ensureNotClosed();
committed.clear();
this.subscriptions.subscribe(pattern, listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1799,8 +1799,10 @@ public void subscribe(Pattern pattern) {
}

@Override
public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) {
subscribeToRegex(pattern, Optional.ofNullable(callback));
public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener listener) {
if (listener == null)
throw new IllegalArgumentException("RebalanceListener cannot be null");
subscribeToRegex(pattern, Optional.of(listener));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.kafka.common.requests;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.DescribeLogDirsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
Expand Down Expand Up @@ -69,71 +68,6 @@ public static DescribeLogDirsResponse parse(ByteBuffer buffer, short version) {
return new DescribeLogDirsResponse(new DescribeLogDirsResponseData(new ByteBufferAccessor(buffer), version));
}

// Note this class is part of the public API, reachable from Admin.describeLogDirs()
/**
* Possible error code:
*
* KAFKA_STORAGE_ERROR (56)
* UNKNOWN (-1)
*
* @deprecated Deprecated Since Kafka 2.7.
* Use {@link org.apache.kafka.clients.admin.DescribeLogDirsResult#descriptions()}
* and {@link org.apache.kafka.clients.admin.DescribeLogDirsResult#allDescriptions()} to access the replacement
* class {@link org.apache.kafka.clients.admin.LogDirDescription}.
*/
@Deprecated
public static class LogDirInfo {
public final Errors error;
public final Map<TopicPartition, ReplicaInfo> replicaInfos;

public LogDirInfo(Errors error, Map<TopicPartition, ReplicaInfo> replicaInfos) {
this.error = error;
this.replicaInfos = replicaInfos;
}

@Override
public String toString() {
return "(error=" +
error +
", replicas=" +
replicaInfos +
")";
}
}

// Note this class is part of the public API, reachable from Admin.describeLogDirs()

/**
* @deprecated Deprecated Since Kafka 2.7.
* Use {@link org.apache.kafka.clients.admin.DescribeLogDirsResult#descriptions()}
* and {@link org.apache.kafka.clients.admin.DescribeLogDirsResult#allDescriptions()} to access the replacement
* class {@link org.apache.kafka.clients.admin.ReplicaInfo}.
*/
@Deprecated
public static class ReplicaInfo {

public final long size;
public final long offsetLag;
public final boolean isFuture;

public ReplicaInfo(long size, long offsetLag, boolean isFuture) {
this.size = size;
this.offsetLag = offsetLag;
this.isFuture = isFuture;
}

@Override
public String toString() {
return "(size=" +
size +
", offsetLag=" +
offsetLag +
", isFuture=" +
isFuture +
")";
}
}

@Override
public boolean shouldClientThrottle(short version) {
return version >= 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2320,50 +2320,6 @@ public void testDescribeLogDirsWithVolumeBytes() throws ExecutionException, Inte
}
}

@SuppressWarnings("deprecation")
@Test
public void testDescribeLogDirsDeprecated() throws ExecutionException, InterruptedException {
Set<Integer> brokers = singleton(0);
TopicPartition tp = new TopicPartition("topic", 12);
String logDir = "/var/data/kafka";
Errors error = Errors.NONE;
int offsetLag = 24;
long partitionSize = 1234567890;

try (AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponseFrom(
prepareDescribeLogDirsResponse(error, logDir, tp, partitionSize, offsetLag),
env.cluster().nodeById(0));

DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers);

Map<Integer, KafkaFuture<Map<String, DescribeLogDirsResponse.LogDirInfo>>> deprecatedValues = result.values();
assertEquals(brokers, deprecatedValues.keySet());
assertNotNull(deprecatedValues.get(0));
assertDescriptionContains(deprecatedValues.get(0).get(), logDir, tp, error, offsetLag, partitionSize);

Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> deprecatedAll = result.all().get();
assertEquals(brokers, deprecatedAll.keySet());
assertDescriptionContains(deprecatedAll.get(0), logDir, tp, error, offsetLag, partitionSize);
}
}

@SuppressWarnings("deprecation")
private static void assertDescriptionContains(Map<String, DescribeLogDirsResponse.LogDirInfo> descriptionsMap,
String logDir, TopicPartition tp, Errors error,
int offsetLag, long partitionSize) {
assertNotNull(descriptionsMap);
assertEquals(singleton(logDir), descriptionsMap.keySet());
assertEquals(error, descriptionsMap.get(logDir).error);
Map<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> allReplicaInfos =
descriptionsMap.get(logDir).replicaInfos;
assertEquals(singleton(tp), allReplicaInfos.keySet());
assertEquals(partitionSize, allReplicaInfos.get(tp).size);
assertEquals(offsetLag, allReplicaInfos.get(tp).offsetLag);
assertFalse(allReplicaInfos.get(tp).isFuture);
}

@Test
public void testDescribeLogDirsOfflineDir() throws ExecutionException, InterruptedException {
Set<Integer> brokers = singleton(0);
Expand Down Expand Up @@ -2396,39 +2352,6 @@ public void testDescribeLogDirsOfflineDir() throws ExecutionException, Interrupt
}
}

@SuppressWarnings("deprecation")
@Test
public void testDescribeLogDirsOfflineDirDeprecated() throws ExecutionException, InterruptedException {
Set<Integer> brokers = singleton(0);
String logDir = "/var/data/kafka";
Errors error = Errors.KAFKA_STORAGE_ERROR;

try (AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponseFrom(
prepareDescribeLogDirsResponse(error, logDir, emptyList()),
env.cluster().nodeById(0));

DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers);

Map<Integer, KafkaFuture<Map<String, DescribeLogDirsResponse.LogDirInfo>>> deprecatedValues = result.values();
assertEquals(brokers, deprecatedValues.keySet());
assertNotNull(deprecatedValues.get(0));
Map<String, DescribeLogDirsResponse.LogDirInfo> valuesMap = deprecatedValues.get(0).get();
assertEquals(singleton(logDir), valuesMap.keySet());
assertEquals(error, valuesMap.get(logDir).error);
assertEquals(emptySet(), valuesMap.get(logDir).replicaInfos.keySet());

Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> deprecatedAll = result.all().get();
assertEquals(brokers, deprecatedAll.keySet());
Map<String, DescribeLogDirsResponse.LogDirInfo> allMap = deprecatedAll.get(0);
assertNotNull(allMap);
assertEquals(singleton(logDir), allMap.keySet());
assertEquals(error, allMap.get(logDir).error);
assertEquals(emptySet(), allMap.get(logDir).replicaInfos.keySet());
}
}

@Test
public void testDescribeReplicaLogDirs() throws ExecutionException, InterruptedException {
TopicPartitionReplica tpr1 = new TopicPartitionReplica("topic", 12, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class MockConsumerTest {
Expand Down Expand Up @@ -163,5 +164,20 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
assertEquals(1, revoked.size());
assertTrue(revoked.contains(topicPartitionList.get(0)));
}

@Test
public void testRe2JPatternSubscription() {
assertThrows(IllegalArgumentException.class, () -> consumer.subscribe((SubscriptionPattern) null));
assertThrows(IllegalArgumentException.class, () -> consumer.subscribe(new SubscriptionPattern("")));

SubscriptionPattern pattern = new SubscriptionPattern("t.*");
assertThrows(IllegalArgumentException.class, () -> consumer.subscribe(pattern, null));

consumer.subscribe(pattern);
assertTrue(consumer.subscription().isEmpty());
// Check that the subscription to pattern was successfully applied in the mock consumer (using a different
// subscription type should fail)
assertThrows(IllegalStateException.class, () -> consumer.subscribe(List.of("topic1")));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1864,6 +1864,9 @@ public void testSubscribeToRe2JPatternValidation() {
assertEquals("Topic pattern to subscribe to cannot be empty", t.getMessage());

assertDoesNotThrow(() -> consumer.subscribe(new SubscriptionPattern("t*")));

assertThrows(IllegalArgumentException.class, () -> consumer.subscribe(new SubscriptionPattern("t*"), null));
assertDoesNotThrow(() -> consumer.subscribe(new SubscriptionPattern("t*"), mock(ConsumerRebalanceListener.class)));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,35 +444,6 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
() => admin.incrementalAlterConfigs(configs).all().get(), "Disabling remote storage feature on the topic level is not supported.")
}

@ParameterizedTest
@ValueSource(strings = Array("zk"))
def testUpdateInvalidRemoteStorageConfigUnderZK(quorum: String): Unit = {
val admin = createAdminClient()
val errorMsg = "It is invalid to set `remote.log.delete.on.disable` or `remote.log.copy.disable` under Zookeeper's mode."
val topicConfig = new Properties
topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)

val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]()
configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName),
util.Arrays.asList(
new AlterConfigOp(new ConfigEntry(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"),
AlterConfigOp.OpType.SET),
))
assertThrowsException(classOf[InvalidConfigurationException],
() => admin.incrementalAlterConfigs(configs).all().get(), errorMsg)

configs.clear()
configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName),
util.Arrays.asList(
new AlterConfigOp(new ConfigEntry(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, "true"),
AlterConfigOp.OpType.SET),
))
assertThrowsException(classOf[InvalidConfigurationException],
() => admin.incrementalAlterConfigs(configs).all().get(), errorMsg)
}

@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testTopicDeletion(quorum: String): Unit = {
Expand Down Expand Up @@ -501,7 +472,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, brokerCount,
topicConfig = topicConfig)

val tsDisabledProps = TestUtils.createBrokerConfigs(1, zkConnectOrNull).head
val tsDisabledProps = TestUtils.createBrokerConfigs(1, null).head
instanceConfigs = List(KafkaConfig.fromProps(tsDisabledProps))

recreateBrokers(startup = true)
Expand All @@ -519,7 +490,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, brokerCount,
topicConfig = topicConfig)

val tsDisabledProps = TestUtils.createBrokerConfigs(1, zkConnectOrNull).head
val tsDisabledProps = TestUtils.createBrokerConfigs(1, null).head
instanceConfigs = List(KafkaConfig.fromProps(tsDisabledProps))

recreateBrokers(startup = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
* is thrown immediately, and is not affected by <code>connection.failed.authentication.delay.ms</code>.
*/
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
def testServerAuthenticationFailure(quorum: String, groupProtocol: String): Unit = {
// Setup client with a non-existent service principal, so that server authentication fails on the client
val clientLoginContext = jaasClientLoginModule(kafkaClientSaslMechanism, Some("another-kafka-service"))
Expand Down
Loading

0 comments on commit 21ce1f2

Please sign in to comment.