From 36cb999ab75459a293f1cf1d47aefc6795ce6758 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Mon, 21 Apr 2025 13:27:24 +0800 Subject: [PATCH] KAFKA-17747: Add compute topic and group hash Signed-off-by: PoAn Yang --- build.gradle | 1 + .../import-control-group-coordinator.xml | 1 + gradle/dependencies.gradle | 2 + .../apache/kafka/coordinator/group/Group.java | 56 ++++++ .../kafka/coordinator/group/GroupTest.java | 189 ++++++++++++++++++ 5 files changed, 249 insertions(+) create mode 100644 group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java diff --git a/build.gradle b/build.gradle index 2e35057165c53..61b68edfe67de 100644 --- a/build.gradle +++ b/build.gradle @@ -1420,6 +1420,7 @@ project(':group-coordinator') { implementation libs.hdrHistogram implementation libs.re2j implementation libs.slf4jApi + implementation libs.guava testImplementation project(':clients').sourceSets.test.output testImplementation project(':server-common').sourceSets.test.output diff --git a/checkstyle/import-control-group-coordinator.xml b/checkstyle/import-control-group-coordinator.xml index 8b6a8d99f5eaa..341ac8984ab93 100644 --- a/checkstyle/import-control-group-coordinator.xml +++ b/checkstyle/import-control-group-coordinator.xml @@ -77,6 +77,7 @@ + diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index f6f7bd68e8363..a4e94e44080a5 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -61,6 +61,7 @@ versions += [ classgraph: "4.8.173", gradle: "8.10.2", grgit: "4.1.1", + guava: "33.4.0-jre", httpclient: "4.5.14", jackson: "2.16.2", jacoco: "0.8.10", @@ -147,6 +148,7 @@ libs += [ caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine", classgraph: "io.github.classgraph:classgraph:$versions.classgraph", commonsValidator: "commons-validator:commons-validator:$versions.commonsValidator", + guava: "com.google.guava:guava:$versions.guava", jacksonAnnotations: "com.fasterxml.jackson.core:jackson-annotations:$versions.jackson", jacksonDatabind: "com.fasterxml.jackson.core:jackson-databind:$versions.jackson", jacksonDatabindYaml: "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:$versions.jackson", diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java index 54d7e98d4b7be..34b17ff2533f4 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java @@ -19,11 +19,21 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.image.ClusterImage; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.metadata.BrokerRegistration; +import com.google.common.hash.HashCode; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; + +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.function.Function; @@ -209,4 +219,50 @@ void validateOffsetFetch( default boolean shouldExpire() { return true; } + + /** + * Computes the hash of the topics in a group. + * + * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. + * @return The hash of the group. + */ + static long computeGroupHash(Map topicHashes) { + return Hashing.combineOrdered( + topicHashes.entrySet() + .stream() + .sorted(Map.Entry.comparingByKey()) + .map(e -> HashCode.fromLong(e.getValue())) + .toList() + ).asLong(); + } + + /** + * Computes the hash of the topic id, name, number of partitions, and partition racks by Murmur3. + * + * @param topicImage The topic image. + * @param clusterImage The cluster image. + * @return The hash of the topic. + */ + static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) { + HashFunction hf = Hashing.murmur3_128(); + Hasher topicHasher = hf.newHasher() + .putByte((byte) 0) // magic byte + .putLong(topicImage.id().hashCode()) // topic Id + .putString(topicImage.name(), StandardCharsets.UTF_8) // topic name + .putInt(topicImage.partitions().size()); // number of partitions + + topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry -> { + topicHasher.putInt(entry.getKey()); // partition id + String racks = Arrays.stream(entry.getValue().replicas) + .mapToObj(clusterImage::broker) + .filter(Objects::nonNull) + .map(BrokerRegistration::rack) + .filter(Optional::isPresent) + .map(Optional::get) + .sorted() + .collect(Collectors.joining(";")); + topicHasher.putString(racks, StandardCharsets.UTF_8); // sorted racks with separator ";" + }); + return topicHasher.hash().asLong(); + } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java new file mode 100644 index 0000000000000..679bcfdf3e1e4 --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.image.MetadataImage; + +import com.google.common.hash.HashCode; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +public class GroupTest { + private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid(); + private static final String FOO_TOPIC_NAME = "foo"; + private static final String BAR_TOPIC_NAME = "bar"; + private static final int FOO_NUM_PARTITIONS = 2; + private static final MetadataImage FOO_METADATA_IMAGE = new MetadataImageBuilder() + .addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS) + .addRacks() + .build(); + + @Test + void testComputeTopicHash() { + long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster()); + + HashFunction hf = Hashing.murmur3_128(); + Hasher topicHasher = hf.newHasher() + .putByte((byte) 0) // magic byte + .putLong(FOO_TOPIC_ID.hashCode()) // topic Id + .putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name + .putInt(FOO_NUM_PARTITIONS) // number of partitions + .putInt(0) // partition 0 + .putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0 + .putInt(1) // partition 1 + .putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1 + assertEquals(topicHasher.hash().asLong(), result); + } + + @Test + void testComputeTopicHashWithDifferentMagicByte() { + long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster()); + + HashFunction hf = Hashing.murmur3_128(); + Hasher topicHasher = hf.newHasher() + .putByte((byte) 1) // different magic byte + .putLong(FOO_TOPIC_ID.hashCode()) // topic Id + .putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name + .putInt(FOO_NUM_PARTITIONS) // number of partitions + .putInt(0) // partition 0 + .putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0 + .putInt(1) // partition 1 + .putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1 + assertNotEquals(topicHasher.hash().asLong(), result); + } + + @Test + void testComputeTopicHashWithDifferentPartitionOrder() { + long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster()); + + HashFunction hf = Hashing.murmur3_128(); + Hasher topicHasher = hf.newHasher() + .putByte((byte) 0) // magic byte + .putLong(FOO_TOPIC_ID.hashCode()) // topic Id + .putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name + .putInt(FOO_NUM_PARTITIONS) // number of partitions + // different partition order + .putInt(1) // partition 1 + .putString("rack1;rack2", StandardCharsets.UTF_8) // rack of partition 1 + .putInt(0) // partition 0 + .putString("rack0;rack1", StandardCharsets.UTF_8); // rack of partition 0 + assertNotEquals(topicHasher.hash().asLong(), result); + } + + @Test + void testComputeTopicHashWithDifferentRackOrder() { + long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster()); + + HashFunction hf = Hashing.murmur3_128(); + Hasher topicHasher = hf.newHasher() + .putByte((byte) 0) // magic byte + .putLong(FOO_TOPIC_ID.hashCode()) // topic Id + .putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name + .putInt(FOO_NUM_PARTITIONS) // number of partitions + .putInt(0) // partition 0 + .putString("rack1;rack0", StandardCharsets.UTF_8) // different rack order of partition 0 + .putInt(1) // partition 1 + .putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1 + assertNotEquals(topicHasher.hash().asLong(), result); + } + + @ParameterizedTest + @MethodSource("differentFieldGenerator") + void testComputeTopicHashWithDifferentField(MetadataImage differentImage, Uuid topicId) { + long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster()); + + assertNotEquals( + Group.computeTopicHash( + differentImage.topics().getTopic(topicId), + differentImage.cluster() + ), + result + ); + } + + private static Stream differentFieldGenerator() { + Uuid differentTopicId = Uuid.randomUuid(); + return Stream.of( + Arguments.of(new MetadataImageBuilder() // different topic id + .addTopic(differentTopicId, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS) + .addRacks() + .build(), + differentTopicId + ), + Arguments.of(new MetadataImageBuilder() // different topic name + .addTopic(FOO_TOPIC_ID, "bar", FOO_NUM_PARTITIONS) + .addRacks() + .build(), + FOO_TOPIC_ID + ), + Arguments.of(new MetadataImageBuilder() // different partitions + .addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, 1) + .addRacks() + .build(), + FOO_TOPIC_ID + ), + Arguments.of(new MetadataImageBuilder() // different racks + .addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS) + .build(), + FOO_TOPIC_ID + ) + ); + } + + @Test + void testComputeGroupHash() { + long result = Group.computeGroupHash(Map.of( + BAR_TOPIC_NAME, 123L, + FOO_TOPIC_NAME, 456L + )); + + long expected = Hashing.combineOrdered(List.of( + HashCode.fromLong(123L), + HashCode.fromLong(456L) + )).asLong(); + assertEquals(expected, result); + } + + @Test + void testComputeGroupHashWithDifferentOrder() { + long result = Group.computeGroupHash(Map.of( + BAR_TOPIC_NAME, 123L, + FOO_TOPIC_NAME, 456L + )); + + long unexpected = Hashing.combineOrdered(List.of( + HashCode.fromLong(456L), + HashCode.fromLong(123L) + )).asLong(); + assertNotEquals(unexpected, result); + } +}