Skip to content

Commit fa96d44

Browse files
committed
Merge branch 'axbaretto'
* axbaretto: KAFKA-12156: Document single threaded response handling in Admin client (#9842)
2 parents 97f5f21 + ec67963 commit fa96d44

File tree

318 files changed

+3749
-3724
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

318 files changed

+3749
-3724
lines changed

build.gradle

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ subprojects {
241241
}
242242
}
243243

244-
def shouldUseJUnit5 = ["tools", "raft"].contains(it.project.name)
244+
def shouldUseJUnit5 = ["tools", "raft", "log4j-appender"].contains(it.project.name)
245245
def testLoggingEvents = ["passed", "skipped", "failed"]
246246
def testShowStandardStreams = false
247247
def testExceptionFormat = 'full'
@@ -1157,6 +1157,7 @@ project(':clients') {
11571157
include "**/org/apache/kafka/common/header/*"
11581158
include "**/org/apache/kafka/common/metrics/*"
11591159
include "**/org/apache/kafka/common/metrics/stats/*"
1160+
include "**/org/apache/kafka/common/quota/*"
11601161
include "**/org/apache/kafka/common/resource/*"
11611162
include "**/org/apache/kafka/common/serialization/*"
11621163
include "**/org/apache/kafka/common/config/*"
@@ -1782,8 +1783,8 @@ project(':log4j-appender') {
17821783
compile libs.slf4jlog4j
17831784

17841785
testCompile project(':clients').sourceSets.test.output
1785-
testCompile libs.junitJupiterApi
1786-
testCompile libs.junitVintageEngine
1786+
testCompile libs.junitJupiter
1787+
testCompile libs.hamcrest
17871788
testCompile libs.easymock
17881789
}
17891790

clients/src/main/java/org/apache/kafka/clients/NetworkClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1171,7 +1171,7 @@ private InProgressData(int requestVersion, boolean isPartialUpdate) {
11711171
this.requestVersion = requestVersion;
11721172
this.isPartialUpdate = isPartialUpdate;
11731173
}
1174-
};
1174+
}
11751175

11761176
}
11771177

clients/src/main/java/org/apache/kafka/clients/admin/Admin.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
3131
import org.apache.kafka.common.ElectionType;
32+
import org.apache.kafka.common.KafkaFuture;
3233
import org.apache.kafka.common.Metric;
3334
import org.apache.kafka.common.MetricName;
3435
import org.apache.kafka.common.TopicPartition;
@@ -46,6 +47,10 @@
4647
* The administrative client for Kafka, which supports managing and inspecting topics, brokers, configurations and ACLs.
4748
* <p>
4849
* Instances returned from the {@code create} methods of this interface are guaranteed to be thread safe.
50+
* However, the {@link KafkaFuture KafkaFutures} returned from request methods are executed
51+
* by a single thread so it is important that any code which executes on that thread when they complete
52+
* (using {@link KafkaFuture#thenApply(KafkaFuture.Function)}, for example) doesn't block
53+
* for too long. If necessary, processing of results should be passed to another thread.
4954
* <p>
5055
* The operations exposed by Admin follow a consistent pattern:
5156
* <ul>
@@ -57,11 +62,11 @@
5762
* preferred over multiple calls to the same method.
5863
* <li>The operation methods execute asynchronously.
5964
* <li>Each {@code xxx} operation method returns an {@code XxxResult} class with methods which expose
60-
* {@link org.apache.kafka.common.KafkaFuture} for accessing the result(s) of the operation.
65+
* {@link KafkaFuture} for accessing the result(s) of the operation.
6166
* <li>Typically an {@code all()} method is provided for getting the overall success/failure of the batch and a
6267
* {@code values()} method provided access to each item in a request batch.
6368
* Other methods may also be provided.
64-
* <li>For synchronous behaviour use {@link org.apache.kafka.common.KafkaFuture#get()}
69+
* <li>For synchronous behaviour use {@link KafkaFuture#get()}
6570
* </ul>
6671
* <p>
6772
* Here is a simple example of using an Admin client instance to create a new topic:

clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -108,17 +108,18 @@ public class ConsumerConfig extends AbstractConfig {
108108
*/
109109
public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = "partition.assignment.strategy";
110110
private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "A list of class names or class types, " +
111-
"ordered by preference, of supported partition assignment " +
112-
"strategies that the client will use to distribute partition " +
113-
"ownership amongst consumer instances when group management is " +
114-
"used.<p>In addition to the default class specified below, " +
115-
"you can use the " +
116-
"<code>org.apache.kafka.clients.consumer.RoundRobinAssignor</code>" +
117-
"class for round robin assignments of partitions to consumers. " +
118-
"</p><p>Implementing the " +
119-
"<code>org.apache.kafka.clients.consumer.ConsumerPartitionAssignor" +
120-
"</code> interface allows you to plug in a custom assignment" +
121-
"strategy.";
111+
"ordered by preference, of supported partition assignment strategies that the client will use to distribute " +
112+
"partition ownership amongst consumer instances when group management is used. Available options are:" +
113+
"<ul>" +
114+
"<li><code>org.apache.kafka.clients.consumer.RangeAssignor</code>: The default assignor, which works on a per-topic basis.</li>" +
115+
"<li><code>org.apache.kafka.clients.consumer.RoundRobinAssignor</code>: Assigns partitions to consumers in a round-robin fashion.</li>" +
116+
"<li><code>org.apache.kafka.clients.consumer.StickyAssignor</code>: Guarantees an assignment that is " +
117+
"maximally balanced while preserving as many existing partition assignments as possible.</li>" +
118+
"<li><code>org.apache.kafka.clients.consumer.CooperativeStickyAssignor</code>: Follows the same StickyAssignor " +
119+
"logic, but allows for cooperative rebalancing.</li>" +
120+
"</ul>" +
121+
"<p>Implementing the <code>org.apache.kafka.clients.consumer.ConsumerPartitionAssignor</code> " +
122+
"interface allows you to plug in a custom assignment strategy.";
122123

123124
/**
124125
* <code>auto.offset.reset</code>

clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public void await(long timeout, TimeUnit unit) {
7171
throw error();
7272
}
7373
if (!success) {
74-
throw new TimeoutException("Timeout expired after " + timeout + unit.name().toLowerCase(Locale.ROOT) + " while awaiting " + operation);
74+
throw new TimeoutException("Timeout expired after " + timeout + " " + unit.name().toLowerCase(Locale.ROOT) + " while awaiting " + operation);
7575
}
7676
} catch (InterruptedException e) {
7777
throw new InterruptException("Received interrupt while awaiting " + operation, e);

clients/src/main/java/org/apache/kafka/common/Cluster.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,9 +232,9 @@ public List<Node> nodes() {
232232
}
233233

234234
/**
235-
* Get the node by the node id (or null if no such node exists)
235+
* Get the node by the node id (or null if the node is not online or does not exist)
236236
* @param id The id of the node
237-
* @return The node, or null if no such node exists
237+
* @return The node, or null if the node is not online or does not exist
238238
*/
239239
public Node nodeById(int id) {
240240
return this.nodesById.get(id);

clients/src/main/resources/common/message/OffsetDeleteRequest.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919
"name": "OffsetDeleteRequest",
2020
"validVersions": "0",
2121
"fields": [
22-
{ "name": "GroupId", "type": "string", "versions": "0+",
22+
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
2323
"about": "The unique group identifier." },
2424
{ "name": "Topics", "type": "[]OffsetDeleteRequestTopic", "versions": "0+",
2525
"about": "The topics to delete offsets for", "fields": [
26-
{ "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
26+
{ "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
2727
"about": "The topic name." },
2828
{ "name": "Partitions", "type": "[]OffsetDeleteRequestPartition", "versions": "0+",
2929
"about": "Each partition to delete offsets for.", "fields": [

clients/src/main/resources/common/message/OffsetDeleteResponse.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
2626
{ "name": "Topics", "type": "[]OffsetDeleteResponseTopic", "versions": "0+",
2727
"about": "The responses for each topic.", "fields": [
28-
{ "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
28+
{ "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
2929
"about": "The topic name." },
3030
{ "name": "Partitions", "type": "[]OffsetDeleteResponsePartition", "versions": "0+",
3131
"about": "The responses for each partition in the topic.", "fields": [

clients/src/main/resources/common/message/TxnOffsetCommitRequest.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
"validVersions": "0-3",
2626
"flexibleVersions": "3+",
2727
"fields": [
28-
{ "name": "TransactionalId", "type": "string", "versions": "0+",
28+
{ "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId",
2929
"about": "The ID of the transaction." },
3030
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
3131
"about": "The ID of the group." },

clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@
2020
import java.net.InetSocketAddress;
2121
import java.net.UnknownHostException;
2222
import java.util.Arrays;
23+
import java.util.Collections;
2324
import java.util.List;
2425
import java.util.stream.Collectors;
2526
import org.apache.kafka.common.config.ConfigException;
2627
import static org.junit.Assert.assertEquals;
2728
import static org.junit.Assert.assertFalse;
29+
import static org.junit.Assert.assertThrows;
2830
import static org.junit.Assert.assertTrue;
2931
import org.junit.Test;
3032

@@ -62,19 +64,20 @@ public void testParseAndValidateAddressesWithReverseLookup() {
6264
validatedAddresses.forEach(address -> assertEquals(10000, address.getPort()));
6365
}
6466

65-
@Test(expected = IllegalArgumentException.class)
67+
@Test
6668
public void testInvalidConfig() {
67-
ClientUtils.parseAndValidateAddresses(Arrays.asList("localhost:10000"), "random.value");
69+
assertThrows(IllegalArgumentException.class,
70+
() -> ClientUtils.parseAndValidateAddresses(Collections.singletonList("localhost:10000"), "random.value"));
6871
}
6972

70-
@Test(expected = ConfigException.class)
73+
@Test
7174
public void testNoPort() {
72-
checkWithoutLookup("127.0.0.1");
75+
assertThrows(ConfigException.class, () -> checkWithoutLookup("127.0.0.1"));
7376
}
7477

75-
@Test(expected = ConfigException.class)
78+
@Test
7679
public void testOnlyBadHostname() {
77-
checkWithoutLookup("some.invalid.hostname.foo.bar.local:9999");
80+
assertThrows(ConfigException.class, () -> checkWithoutLookup("some.invalid.hostname.foo.bar.local:9999"));
7881
}
7982

8083
@Test
@@ -95,9 +98,10 @@ public void testFilterPreferredAddresses() throws UnknownHostException {
9598
assertEquals(1, result.size());
9699
}
97100

98-
@Test(expected = UnknownHostException.class)
99-
public void testResolveUnknownHostException() throws UnknownHostException {
100-
ClientUtils.resolve("some.invalid.hostname.foo.bar.local", ClientDnsLookup.USE_ALL_DNS_IPS);
101+
@Test
102+
public void testResolveUnknownHostException() {
103+
assertThrows(UnknownHostException.class,
104+
() -> ClientUtils.resolve("some.invalid.hostname.foo.bar.local", ClientDnsLookup.USE_ALL_DNS_IPS));
101105
}
102106

103107
@Test

clients/src/test/java/org/apache/kafka/clients/InFlightRequestsTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.List;
3131

3232
import static org.junit.Assert.assertEquals;
33+
import static org.junit.Assert.assertThrows;
3334

3435
public class InFlightRequestsTest {
3536

@@ -99,14 +100,14 @@ public void testCompleteNext() {
99100
assertEquals(0, inFlightRequests.count());
100101
}
101102

102-
@Test(expected = IllegalStateException.class)
103+
@Test
103104
public void testCompleteNextThrowsIfNoInflights() {
104-
inFlightRequests.completeNext(dest);
105+
assertThrows(IllegalStateException.class, () -> inFlightRequests.completeNext(dest));
105106
}
106107

107-
@Test(expected = IllegalStateException.class)
108+
@Test
108109
public void testCompleteLastSentThrowsIfNoInFlights() {
109-
inFlightRequests.completeLastSent(dest);
110+
assertThrows(IllegalStateException.class, () -> inFlightRequests.completeLastSent(dest));
110111
}
111112

112113
private int addRequest(String destination) {

clients/src/test/java/org/apache/kafka/clients/MetadataTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,10 @@ private static MetadataResponse emptyMetadataResponse() {
7676
Collections.emptyList());
7777
}
7878

79-
@Test(expected = IllegalStateException.class)
79+
@Test
8080
public void testMetadataUpdateAfterClose() {
8181
metadata.close();
82-
metadata.updateWithCurrentRequestVersion(emptyMetadataResponse(), false, 1000);
82+
assertThrows(IllegalStateException.class, () -> metadata.updateWithCurrentRequestVersion(emptyMetadataResponse(), false, 1000));
8383
}
8484

8585
private static void checkTimeToNextUpdate(long refreshBackoffMs, long metadataExpireMs) {

clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import static org.junit.Assert.assertNotEquals;
6464
import static org.junit.Assert.assertNotNull;
6565
import static org.junit.Assert.assertNull;
66+
import static org.junit.Assert.assertThrows;
6667
import static org.junit.Assert.assertTrue;
6768

6869
public class NetworkClientTest {
@@ -119,13 +120,12 @@ public void setup() {
119120
selector.reset();
120121
}
121122

122-
@Test(expected = IllegalStateException.class)
123+
@Test
123124
public void testSendToUnreadyNode() {
124125
MetadataRequest.Builder builder = new MetadataRequest.Builder(Collections.singletonList("test"), true);
125126
long now = time.milliseconds();
126127
ClientRequest request = client.newClientRequest("5", builder, now, false);
127-
client.send(request, now);
128-
client.poll(1, time.milliseconds());
128+
assertThrows(IllegalStateException.class, () -> client.send(request, now));
129129
}
130130

131131
@Test

clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import static org.junit.Assert.assertEquals;
3030
import static org.junit.Assert.assertNull;
31+
import static org.junit.Assert.assertThrows;
3132
import static org.junit.Assert.assertTrue;
3233

3334
public class NodeApiVersionsTest {
@@ -103,28 +104,32 @@ public void testLatestUsableVersion() {
103104
assertEquals(3, apiVersions.latestUsableVersion(ApiKeys.PRODUCE, (short) 3, (short) 4));
104105
}
105106

106-
@Test(expected = UnsupportedVersionException.class)
107+
@Test
107108
public void testLatestUsableVersionOutOfRangeLow() {
108109
NodeApiVersions apiVersions = NodeApiVersions.create(ApiKeys.PRODUCE.id, (short) 1, (short) 2);
109-
apiVersions.latestUsableVersion(ApiKeys.PRODUCE, (short) 3, (short) 4);
110+
assertThrows(UnsupportedVersionException.class,
111+
() -> apiVersions.latestUsableVersion(ApiKeys.PRODUCE, (short) 3, (short) 4));
110112
}
111113

112-
@Test(expected = UnsupportedVersionException.class)
114+
@Test
113115
public void testLatestUsableVersionOutOfRangeHigh() {
114116
NodeApiVersions apiVersions = NodeApiVersions.create(ApiKeys.PRODUCE.id, (short) 2, (short) 3);
115-
apiVersions.latestUsableVersion(ApiKeys.PRODUCE, (short) 0, (short) 1);
117+
assertThrows(UnsupportedVersionException.class,
118+
() -> apiVersions.latestUsableVersion(ApiKeys.PRODUCE, (short) 0, (short) 1));
116119
}
117120

118-
@Test(expected = UnsupportedVersionException.class)
121+
@Test
119122
public void testUsableVersionCalculationNoKnownVersions() {
120123
NodeApiVersions versions = new NodeApiVersions(new ApiVersionsResponseKeyCollection());
121-
versions.latestUsableVersion(ApiKeys.FETCH);
124+
assertThrows(UnsupportedVersionException.class,
125+
() -> versions.latestUsableVersion(ApiKeys.FETCH));
122126
}
123127

124-
@Test(expected = UnsupportedVersionException.class)
128+
@Test
125129
public void testLatestUsableVersionOutOfRange() {
126130
NodeApiVersions apiVersions = NodeApiVersions.create(ApiKeys.PRODUCE.id, (short) 300, (short) 300);
127-
apiVersions.latestUsableVersion(ApiKeys.PRODUCE);
131+
assertThrows(UnsupportedVersionException.class,
132+
() -> apiVersions.latestUsableVersion(ApiKeys.PRODUCE));
128133
}
129134

130135
@Test

clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1067,7 +1067,7 @@ Errors.NONE, new TopicPartition(topic, 0), Optional.of(leader.id()), Optional.of
10671067
DescribeTopicsResult result = env.adminClient().describeTopics(Collections.singleton(topic));
10681068
Map<String, TopicDescription> topicDescriptions = result.all().get();
10691069
assertEquals(leader, topicDescriptions.get(topic).partitions().get(0).leader());
1070-
assertEquals(null, topicDescriptions.get(topic).authorizedOperations());
1070+
assertNull(topicDescriptions.get(topic).authorizedOperations());
10711071
}
10721072
}
10731073

@@ -1247,9 +1247,9 @@ public void testDeleteAcls() throws Exception {
12471247
DeleteAclsResult results = env.adminClient().deleteAcls(asList(FILTER1, FILTER2));
12481248
Map<AclBindingFilter, KafkaFuture<FilterResults>> filterResults = results.values();
12491249
FilterResults filter1Results = filterResults.get(FILTER1).get();
1250-
assertEquals(null, filter1Results.values().get(0).exception());
1250+
assertNull(filter1Results.values().get(0).exception());
12511251
assertEquals(ACL1, filter1Results.values().get(0).binding());
1252-
assertEquals(null, filter1Results.values().get(1).exception());
1252+
assertNull(filter1Results.values().get(1).exception());
12531253
assertEquals(ACL2, filter1Results.values().get(1).binding());
12541254
TestUtils.assertFutureError(filterResults.get(FILTER2), SecurityDisabledException.class);
12551255
TestUtils.assertFutureError(results.all(), SecurityDisabledException.class);
@@ -2068,7 +2068,7 @@ public void testDescribeCluster() throws Exception {
20682068
final DescribeClusterResult result = env.adminClient().describeCluster();
20692069
assertEquals(env.cluster().clusterResource().clusterId(), result.clusterId().get());
20702070
assertEquals(2, result.controller().get().id());
2071-
assertEquals(null, result.authorizedOperations().get());
2071+
assertNull(result.authorizedOperations().get());
20722072

20732073
// Test DescribeCluster with the authorized operations included.
20742074
final DescribeClusterResult result2 = env.adminClient().describeCluster();

0 commit comments

Comments
 (0)