Skip to content

Conversation

@FrankYang0529
Copy link
Member

@FrankYang0529 FrankYang0529 commented Apr 21, 2025

  • Add com.dynatrace.hash4j:hash4j:0.22.0 to dependencies.
  • Add computeTopicHash to org.apache.kafka.coordinator.group.Utils.
    • If topic name is non-existent, return 0.
    • If topic name is existent, use streaming XXH3 to compute topic hash
      with magic byte, topic id, topic name, number of partitions, partition
      id and sorted racks.
  • Add computeGroupHash to org.apache.kafka.coordinator.group.Utils.
    • If topic map is empty, return 0.
    • If topic map is not empty, use streaming XXH3 to compute group
      metadata hash with sorted topic hashes by topic names.
  • Add related unit test.

Reviewers: Ismael Juma ismael@juma.me.uk, Chia-Ping Tsai
chia7712@gmail.com, Sean Quah squah@confluent.io, David Jacot
djacot@confluent.io

Signed-off-by: PoAn Yang <payang@apache.org>
@github-actions github-actions bot added the build Gradle build or GitHub Actions label Apr 21, 2025
Signed-off-by: PoAn Yang <payang@apache.org>
@github-actions github-actions bot added tools dependencies Pull requests that update a dependency file labels Apr 21, 2025
@FrankYang0529 FrankYang0529 changed the title KAFKA-17747: [2/N] Add compute topic and group hash (wip) KAFKA-17747: [2/N] Add compute topic and group hash Apr 21, 2025
@FrankYang0529 FrankYang0529 requested a review from dajac April 21, 2025 15:52
Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankYang0529 Thanks for the patch. I have started diving into it. I left some questions to start with.

caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine",
classgraph: "io.github.classgraph:classgraph:$versions.classgraph",
commonsValidator: "commons-validator:commons-validator:$versions.commonsValidator",
guava: "com.google.guava:guava:$versions.guava",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something that we haven't really discussed in the KIP because it is an implementation detail but we should discuss whether we really want to take a dependency on Guava.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I update PR to remove guava. I think we can put all data to a byte array and use Murmur3 to hash it, so we don't rely on guava.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

While talking to @ijuma about it, he has suggested to look into https://github.com/lz4/lz4-java/tree/master/src/java/net/jpountz/xxhash. We get it via lz4 and it is apparently much faster than Murmur3. It may be worth running a few benchmarks to compare then. What do you think?

I also wonder what is the impact of putting all the data to a byte array before hashing it. Do you have thoughts on this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, the lz4-java is not maintained anymore, so not sure whether the code maintaining is a risk.

I also wonder what is the impact of putting all the data to a byte array before hashing it. Do you have thoughts on this?

I suggest that EventProcessorThread can leverage GrowableBufferSupplier to reuse buffer as much as possible. Additionally, Group#computeTopicHashin should use ByteBufferOutputStream to generate the bytes array, as ByteBufferOutputStream#buffer#array can avoid extra array copy like ByteArrayOutputStream#toByteArray

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This hash function is used by zstd too. Its pretty safe to rely on it given that lz4 and zstd are the most popular compression algorithms. And we will be supporting them for the foreseeable future.

Which particular implementation we use is a fair question.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, the lz4-java is not maintained anymore, so not sure whether the code maintaining is a risk.

I also wonder what is the impact of putting all the data to a byte array before hashing it. Do you have thoughts on this?

I suggest that EventProcessorThread can leverage GrowableBufferSupplier to reuse buffer as much as possible. Additionally, Group#computeTopicHashin should use ByteBufferOutputStream to generate the bytes array, as ByteBufferOutputStream#buffer#array can avoid extra array copy like ByteArrayOutputStream#toByteArray

I agree with using a BufferSupplier in order to reuse buffers. However, EventProcessorThread may be too low level to hold it. Having it in Shard may be enough.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. Updated benchmark result.

Benchmark                      (numReplicasPerBroker)  (partitionsPerTopic)  (replicationFactor)  Mode  Cnt     Score    Error  Units
TopicHashBenchmark.testLz4                         10                    10                    3  avgt   15   166.389 ±  1.542  ns/op
TopicHashBenchmark.testLz4                         10                    50                    3  avgt   15   375.660 ±  2.771  ns/op
TopicHashBenchmark.testLz4                         10                   100                    3  avgt   15   636.176 ±  8.305  ns/op
TopicHashBenchmark.testMurmur                      10                    10                    3  avgt   15   238.242 ±  1.664  ns/op
TopicHashBenchmark.testMurmur                      10                    50                    3  avgt   15  1143.583 ±  5.981  ns/op
TopicHashBenchmark.testMurmur                      10                   100                    3  avgt   15  2278.680 ± 29.007  ns/op
TopicHashBenchmark.java
package org.apache.kafka.jmh.metadata;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.image.ClusterDelta;
import org.apache.kafka.image.ClusterImage;
import org.apache.kafka.image.TopicsDelta;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.streams.state.internals.Murmur3;

import net.jpountz.xxhash.XXHash64;
import net.jpountz.xxhash.XXHashFactory;

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.kafka.jmh.metadata.TopicsImageSnapshotLoadBenchmark.getInitialTopicsDelta;
import static org.apache.kafka.jmh.metadata.TopicsImageSnapshotLoadBenchmark.getNumBrokers;

@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 5)
@Measurement(iterations = 15)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public class TopicHashBenchmark {
    @Param({"10", "50", "100"})
    private int partitionsPerTopic;
    @Param({"3"})
    private int replicationFactor;
    @Param({"10"})
    private int numReplicasPerBroker;

    private byte[] topicBytes;

    @Setup(Level.Trial)
    public void setup() throws IOException {
        TopicsDelta topicsDelta = getInitialTopicsDelta(1, partitionsPerTopic, replicationFactor, numReplicasPerBroker);
        int numBrokers = getNumBrokers(1, partitionsPerTopic, replicationFactor, numReplicasPerBroker);
        ClusterDelta clusterDelta = new ClusterDelta(ClusterImage.EMPTY);
        for (int i = 0; i < numBrokers; i++) {
            clusterDelta.replay(new RegisterBrokerRecord()
                .setBrokerId(i)
                .setRack(Uuid.randomUuid().toString())
            );
        }
        TopicImage topicImage = topicsDelta.apply().topicsById().values().stream().findFirst().get();
        ClusterImage clusterImage = clusterDelta.apply();

        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
             DataOutputStream dos = new DataOutputStream(baos)) {
            dos.writeByte(0); // magic byte
            dos.writeLong(topicImage.id().hashCode()); // topic ID
            dos.writeUTF(topicImage.name()); // topic name
            dos.writeInt(topicImage.partitions().size()); // number of partitions
            for (int i = 0; i < topicImage.partitions().size(); i++) {
                dos.writeInt(i); // partition id
                List<String> sortedRacksList = Arrays.stream(topicImage.partitions().get(i).replicas)
                    .mapToObj(clusterImage::broker)
                    .filter(Objects::nonNull)
                    .map(BrokerRegistration::rack)
                    .filter(Optional::isPresent)
                    .map(Optional::get)
                    .sorted()
                    .toList();

                String racks = IntStream.range(0, sortedRacksList.size())
                    .mapToObj(idx -> idx + ":" + sortedRacksList.get(idx)) // Format: "index:value"
                    .collect(Collectors.joining(",")); // Separator between "index:value" pairs
                dos.writeUTF(racks); // sorted racks
            }
            dos.flush();
            topicBytes = baos.toByteArray();
        }
    }

    @Benchmark
    public void testLz4() {
        XXHash64 hash = XXHashFactory.fastestInstance().hash64();
        hash.hash(topicBytes, 0, topicBytes.length, 0);
    }

    @Benchmark
    public void testMurmur() {
        Murmur3.hash64(topicBytes);
    }
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with using a BufferSupplier in order to reuse buffers. However, EventProcessorThread may be too low level to hold it. Having it in Shard may be enough.

we can revisit this when the critical code are used by production :)

@FrankYang0529 thanks for updates. the result LGTM.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also wonder what is the impact of putting all the data to a byte array before hashing it. Do you have thoughts on this?

Based on the KIP-1101, it minimizes the calculation count of topic hash. The result can be shared between groups. I think we can keep this function simple currently.

I suggest that EventProcessorThread can leverage GrowableBufferSupplier to reuse buffer as much as possible.

With BufferSupplier, the hash function needs to be thread safe to reuse the buffer. We can revisit it in the future.

Additionally, Group#computeTopicHashin should use ByteBufferOutputStream to generate the bytes array, as ByteBufferOutputStream#buffer#array can avoid extra array copy like ByteArrayOutputStream#toByteArray

The ByteBufferOutputStream needs a ByteBuffer with capacity. I wonder whether we can calculate a accurate capacity. For example, rack string can contain any character. It presents 1 to 4 bytes in UTF-8.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ByteBufferOutputStream needs a ByteBuffer with capacity. I wonder whether we can calculate a accurate capacity. For example, rack string can contain any character. It presents 1 to 4 bytes in UTF-8.

The initialize capacity can be discussed later. In fact, it may be not a issue if we adopt the growable buffer. The buffer can be big enough for each hash computing eventually.

* @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash.
* @return The hash of the group.
*/
static long computeGroupHash(Map<String, Long> topicHashes) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether it is worth inlining the implementation from Guava or something similar to combine the hashes. It would avoid the extra collections. I am not sure whether it makes a real difference though. What are your thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the KIP, you also mentioned combining the index with the hash. Is this something done within combineOrdered?

Copy link
Member Author

@FrankYang0529 FrankYang0529 Apr 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether it is worth inlining the implementation from Guava or something similar to combine the hashes. It would avoid the extra collections.

Yes, I copy some implementation to this function.

In the KIP, you also mentioned combining the index with the hash. Is this something done within combineOrdered?

No, the computeGroupHash sorts topics by name and use this order to merge hashes. I also add test case testComputeGroupHashWithDifferentOrder and testComputeGroupHashWithSameKeyButDifferentValue to verify it.

.filter(Optional::isPresent)
.map(Optional::get)
.sorted()
.collect(Collectors.joining(";"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

; is allowed in the rack field too so it does really protect us.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like any character can be valid. I change the combination with following format:

0:<rack 0>,1:<rack 1>, ...

Copy link
Contributor

@squah-confluent squah-confluent Apr 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine preventing for accidental collisions. Though it's still possible to intentionally come up with rack names that create collisions, but I believe you'd only be impacting your own cluster.

To rule out any ambiguity, we could pretend this was a serialization format and either prefix strings with their length, or null-terminate them. The same for variable-length lists of strings. These can either be length-prefixed or terminated with an invalid string that cannot occur (""? but not sure on this).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking the suggestion. I think it's fine now.
Small nit though, I was actually thinking of writing the length in binary, using writeInt and dropping the : and , separators entirely. Apologies if I wasn't clear enough earlier.

.putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
.putInt(topicImage.partitions().size()); // number of partitions

topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We know that partitions go from 0 to N. I wonder whether we should use a good old for loop instead of sorting the partitions. What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion! Thanks. Updated it.

static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) {
HashFunction hf = Hashing.murmur3_128();
Hasher topicHasher = hf.newHasher()
.putByte((byte) 0) // magic byte
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we define a constant for the magic byte?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, add TOPIC_HASH_MAGIC_BYTE.

Comment on lines 228 to 229
byte TOPIC_HASH_MAGIC_BYTE = 0x00;
XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to put those and the new methods to a separate class? Having them in Group is weird because it won't be used by all the group types.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move to org.apache.kafka.coordinator.group.Utils. Thanks.

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankYang0529 thanks for this patch.

* @return The hash of the topic.
*/
static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) throws IOException {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do a small optimization for it by using ByteBufferOutputStream. for example:

        try (var baos = new ByteBufferOutputStream(100);
             var dos = new DataOutputStream(baos)) {
            ...
            dos.flush();
            var topicBytes = baos.buffer().flip();
            return LZ4_HASH_INSTANCE.hash(topicBytes, 0);
        }

LZ4_HASH_INSTANCE.hash takes an array of ByteBuffer to compute the hash, which avoids an array copy.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I misunderstood ByteBufferOutputStream. I thought it uses fixed capacity even if there is no enough buffer. After checking the source code, it expands memory if the buffer is not big enough. Updated it. Thanks.

}

/**
* Computes the hash of the topic id, name, number of partitions, and partition racks by Murmur3.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please update the docs

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated it. Thanks.

}
});

// Convert the byte array to long. This is taken from guava BytesHashCode#asLong.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not using LZ4_HASH_INSTANCE?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can use it. Thanks.

.sorted()
.toList();

String racks = IntStream.range(0, sortedRacksList.size())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the KIP does not mention the "index" for the rack. could it be replaced by String.join(",", sortedRacksList)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no limitation for rack string, so any character can be part of rack string. I can update KIP if needs.

Signed-off-by: PoAn Yang <payang@apache.org>
Signed-off-by: PoAn Yang <payang@apache.org>
* @param clusterImage The cluster image.
* @return The hash of the topic.
*/
static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add documentation to remind developers that the hash is stored as part of the state. Changing the implementation of the hashing function may break compatibility with existing states.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the hashing function is ever changed, is there a version field that should be updated?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the hashing function is ever changed, is there a version field that should be updated?

yes, there is a magic byte as version.

.sorted(Map.Entry.comparingByKey()) // sort by topic name
.map(Map.Entry::getValue)
.map(longToBytes::apply)
.forEach(nextBytes -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're adding a lot of unnecessary overhead for the hash computation (multiple map calls, etc.). We should probably just use an old school loop.

.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
.addRacks()
.build();
private static final XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

XXH3 seems to be the fastest implementation. Did we consider using that?

https://github.com/Cyan4973/xxHash

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. I do benchmark for streaming XXH3 / streaming XXH64 / non-streaming XXH3 / non-streaming XXH64. The streaming XXH3 gets the best result. However, it needs to include new library com.dynatrace.hash4j. Do we want to import it?

cc @chia7712 @dajac

Benchmark                                      (numReplicasPerBroker)  (partitionsPerTopic)  (replicationFactor)  Mode  Cnt      Score      Error  Units
TopicHashBenchmark.testDynatraceStreamingXXH3                      10                    10                    3  avgt    5    879.241 ±    6.788  ns/op
TopicHashBenchmark.testDynatraceStreamingXXH3                      10                    50                    3  avgt    5   4192.380 ±  195.424  ns/op
TopicHashBenchmark.testDynatraceStreamingXXH3                      10                   100                    3  avgt    5   8027.227 ±  210.403  ns/op
TopicHashBenchmark.testDynatraceXXH3                               10                    10                    3  avgt    5   1676.398 ±    2.249  ns/op
TopicHashBenchmark.testDynatraceXXH3                               10                    50                    3  avgt    5   9256.175 ±   45.298  ns/op
TopicHashBenchmark.testDynatraceXXH3                               10                   100                    3  avgt    5  20195.772 ±   37.651  ns/op
TopicHashBenchmark.testLz4StreamingXXHash64                        10                    10                    3  avgt    5   9739.833 ±  188.303  ns/op
TopicHashBenchmark.testLz4StreamingXXHash64                        10                    50                    3  avgt    5  45540.195 ±  455.747  ns/op
TopicHashBenchmark.testLz4StreamingXXHash64                        10                   100                    3  avgt    5  89084.689 ± 2164.862  ns/op
TopicHashBenchmark.testLz4XXHash64                                 10                    10                    3  avgt    5   1755.391 ±    6.436  ns/op
TopicHashBenchmark.testLz4XXHash64                                 10                    50                    3  avgt    5   9421.643 ±   79.838  ns/op
TopicHashBenchmark.testLz4XXHash64                                 10                   100                    3  avgt    5  19461.960 ±  425.881  ns/op
JMH benchmarks done
TopicHashBenchmark.java
package org.apache.kafka.jmh.metadata;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.image.ClusterDelta;
import org.apache.kafka.image.ClusterImage;
import org.apache.kafka.image.TopicsDelta;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.metadata.BrokerRegistration;

import com.dynatrace.hash4j.hashing.HashStream64;
import com.dynatrace.hash4j.hashing.Hashing;

import net.jpountz.xxhash.StreamingXXHash64;
import net.jpountz.xxhash.XXHashFactory;

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;


import static org.apache.kafka.jmh.metadata.TopicsImageSnapshotLoadBenchmark.getInitialTopicsDelta;
import static org.apache.kafka.jmh.metadata.TopicsImageSnapshotLoadBenchmark.getNumBrokers;

@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 3)
@Measurement(iterations = 5)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public class TopicHashBenchmark {
    @Param({"10", "50", "100"})
    private int partitionsPerTopic;
    @Param({"3"})
    private int replicationFactor;
    @Param({"10"})
    private int numReplicasPerBroker;

    private TopicImage topicImage;
    private ClusterImage clusterImage;

    @Setup(Level.Trial)
    public void setup() throws IOException {
        TopicsDelta topicsDelta = getInitialTopicsDelta(1, partitionsPerTopic, replicationFactor, numReplicasPerBroker);
        int numBrokers = getNumBrokers(1, partitionsPerTopic, replicationFactor, numReplicasPerBroker);
        ClusterDelta clusterDelta = new ClusterDelta(ClusterImage.EMPTY);
        for (int i = 0; i < numBrokers; i++) {
            clusterDelta.replay(new RegisterBrokerRecord()
                .setBrokerId(i)
                .setRack(Uuid.randomUuid().toString())
            );
        }
        topicImage = topicsDelta.apply().topicsById().values().stream().findFirst().get();
        clusterImage = clusterDelta.apply();
    }

    @Benchmark
    public void testLz4StreamingXXHash64() {
        try (StreamingXXHash64 hash = XXHashFactory.fastestInstance().newStreamingHash64(0)) {
            hash.update(new byte[]{(byte) 0}, 0, 1); // magic byte

            // topic id
            hash.update(intToBytes(topicImage.id().hashCode()), 0, 32);

            // topic name
            byte[] topicNameBytes = topicImage.name().getBytes();
            hash.update(topicNameBytes, 0, topicNameBytes.length);

            // number of partitions
            hash.update(intToBytes(topicImage.partitions().size()), 0, 32);

            for (int i = 0; i < topicImage.partitions().size(); i++) {
                // partition id
                hash.update(intToBytes(i), 0, 32);

                // sorted racks
                List<String> racks = new ArrayList<String>();
                for (int replicaId : topicImage.partitions().get(i).replicas) {
                    BrokerRegistration broker = clusterImage.broker(replicaId);
                    if (broker != null) {
                        Optional<String> rackOptional = broker.rack();
                        rackOptional.ifPresent(racks::add);
                    }
                }

                Collections.sort(racks);
                for (String rack : racks) {
                    // Format: "<length><value>"
                    byte[] rackBytes = rack.getBytes();
                    hash.update(intToBytes(rack.length()), 0, 32);
                    hash.update(rackBytes, 0, rackBytes.length);
                }
            }
            hash.getValue();
        }
    }

    @Benchmark
    public void testLz4XXHash64() throws IOException {
        try (ByteBufferOutputStream bbos = new ByteBufferOutputStream(512);
             DataOutputStream dos = new DataOutputStream(bbos)) {
            dos.writeByte(0); // magic byte
            dos.writeLong(topicImage.id().hashCode()); // topic ID
            dos.writeUTF(topicImage.name()); // topic name
            dos.writeInt(topicImage.partitions().size()); // number of partitions
            for (int i = 0; i < topicImage.partitions().size(); i++) {
                dos.writeInt(i); // partition id
                // The rack string combination cannot use simple separator like ",", because there is no limitation for rack character.
                // If using simple separator like "," it may hit edge case like ",," and ",,," / ",,," and ",,".
                // Add length before the rack string to avoid the edge case.
                List<String> racks = new ArrayList<>();
                for (int replicaId : topicImage.partitions().get(i).replicas) {
                    BrokerRegistration broker = clusterImage.broker(replicaId);
                    if (broker != null) {
                        Optional<String> rackOptional = broker.rack();
                        rackOptional.ifPresent(racks::add);
                    }
                }

                Collections.sort(racks);
                for (String rack : racks) {
                    // Format: "<length><value>"
                    dos.writeInt(rack.length());
                    dos.writeUTF(rack);
                }
            }
            dos.flush();
            ByteBuffer topicBytes = bbos.buffer().flip();
            XXHashFactory.fastestInstance().hash64().hash(topicBytes, 0);
        }
    }

    @Benchmark
    public void testDynatraceStreamingXXH3() {
        HashStream64 hash = Hashing.xxh3_64().hashStream();
        hash = hash.putByte((byte) 0)
            .putLong(topicImage.id().hashCode())
            .putString(topicImage.name())
            .putInt(topicImage.partitions().size());

        for (int i = 0; i < topicImage.partitions().size(); i++) {
            // partition id
            hash = hash.putInt(i);

            // sorted racks
            List<String> racks = new ArrayList<String>();
            for (int replicaId : topicImage.partitions().get(i).replicas) {
                BrokerRegistration broker = clusterImage.broker(replicaId);
                if (broker != null) {
                    Optional<String> rackOptional = broker.rack();
                    rackOptional.ifPresent(racks::add);
                }
            }

            Collections.sort(racks);
            for (String rack : racks) {
                // Format: "<length><value>"
                hash.putInt(rack.length());
                hash.putString(rack);
            }
        }
        hash.getAsLong();
    }

    @Benchmark
    public void testDynatraceXXH3() throws IOException {
        try (ByteBufferOutputStream bbos = new ByteBufferOutputStream(512);
             DataOutputStream dos = new DataOutputStream(bbos)) {
            dos.writeByte(0); // magic byte
            dos.writeLong(topicImage.id().hashCode()); // topic ID
            dos.writeUTF(topicImage.name()); // topic name
            dos.writeInt(topicImage.partitions().size()); // number of partitions
            for (int i = 0; i < topicImage.partitions().size(); i++) {
                dos.writeInt(i); // partition id
                // The rack string combination cannot use simple separator like ",", because there is no limitation for rack character.
                // If using simple separator like "," it may hit edge case like ",," and ",,," / ",,," and ",,".
                // Add length before the rack string to avoid the edge case.
                List<String> racks = new ArrayList<>();
                for (int replicaId : topicImage.partitions().get(i).replicas) {
                    BrokerRegistration broker = clusterImage.broker(replicaId);
                    if (broker != null) {
                        Optional<String> rackOptional = broker.rack();
                        rackOptional.ifPresent(racks::add);
                    }
                }

                Collections.sort(racks);
                for (String rack : racks) {
                    // Format: "<length><value>"
                    dos.writeInt(rack.length());
                    dos.writeUTF(rack);
                }
            }
            dos.flush();
            ByteBuffer topicBytes = bbos.buffer().flip();
            Hashing.xxh3_64().hashBytesToLong(topicBytes.array());
        }
    }

    private byte[] intToBytes(int value) {
        return new byte[] {
            (byte)(value >>> 24),
            (byte)(value >>> 16),
            (byte)(value >>> 8),
            (byte)value
        };
    }
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using streaming XXH3 seems pretty interesting to me given the results. Is com.dynatrace.hash4j the only way to get it? The library seems reasonable to be taken as a dependency on the server.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From https://xxhash.com/, there are three Java libraries. Only zero-allocation-hashing and hash4j provides XXH3. However, only hash4j has streaming hash class.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I am fine with taking hash4j as a dependency. It is a small one without too much risk but let's see what @ijuma and @chia7712 think about it too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like a high quality library from their description of why they didn't use existing ones: https://www.dynatrace.com/news/blog/hash4j-new-library-java/

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is a server-side dependency, it seems fine. If it's also client-side, we probably need to think more about it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I confirm that it is sever-side only.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will update the PR to use hash4j with streaming XXH3. Thanks.

}
dos.flush();
ByteBuffer topicBytes = bbos.buffer().flip();
return LZ4_HASH_INSTANCE.hash(topicBytes, 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is also a streaming hash class - would that be a better option instead of creating the complete byte buffer?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankYang0529 Have you had a chance to look into the streaming hash class?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I just saw your comment here: #19523 (comment)

Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankYang0529 Thanks for the update. The patch looks pretty good to me. I left some suggestions for consideration.

build.gradle Outdated
implementation project(':coordinator-common')
implementation libs.jacksonDatabind
implementation libs.jacksonJDK8Datatypes
implementation libs.lz4
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed it. Thanks.

implementation libs.hdrHistogram
implementation libs.re2j
implementation libs.slf4jApi
implementation libs.hash4j
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose that we need to update the license file too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I added it to LICENSE-binary and python committer-tools/verify_license.py can pass. Do I need to modify other files? Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine. Thanks.

// The rack string combination cannot use simple separator like ",", because there is no limitation for rack character.
// If using simple separator like "," it may hit edge case like ",," and ",,," / ",,," and ",,".
// Add length before the rack string to avoid the edge case.
List<String> racks = new ArrayList<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I wonder whether we could reuse this list. We could declare it before the loop and clear it before using it. What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I initialize racks outside the for-loop and call clear before adding data.

Comment on lines 399 to 400
Optional<String> rackOptional = broker.rack();
rackOptional.ifPresent(racks::add);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We could combine those two lines into one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated it.

*/
static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) throws IOException {
HashStream64 hasher = Hashing.xxh3_64().hashStream();
hasher = hasher.putByte(TOPIC_HASH_MAGIC_BYTE) // magic byte
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could you please put .putByte(TOPIC_HASH_MAGIC_BYTE) // magic byte on a new line. It is easier to read if it is aligned with the other ones.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated it.

hasher = hasher.putInt(rack.length()).putString(rack);
}
}
return hasher.getAsLong();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should we put an empty line before the return?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated it.

* @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash.
* @return The hash of the group.
*/
static long computeGroupHash(Map<String, Long> topicHashes) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: public?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, this function will only be used in GroupMetadataManager like throwIfRegularExpressionIsInvalid. I think we can add public when we need it. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. It works for me.

* @param clusterImage The cluster image.
* @return The hash of the topic.
*/
static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: public?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.


@Test
void testComputeTopicHash() throws IOException {
long result = Utils.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a usage perspective, I wonder whether we should just pass the topic name and the metadata image to the method to compute the hash. I suppose that we will use it like this in the end but I am not 100% sure. What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If input is topic name and metadata image, we need to handle null TopicImage in computeTopicHash. Do we want to handle this error in it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could return 0 when the topic does not exist. Would it work?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think it works. I also check whether topic map is empty computeGroupHash. If it's, return 0.

Comment on lines 156 to 161
Arguments.of(new MetadataImageBuilder() // different topic id
.addTopic(differentTopicId, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
.addRacks()
.build(),
differentTopicId
),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could you please put new MetadataImageBuilder() // different topic id on a new line too?

            Arguments.of(
                new MetadataImageBuilder() // different topic id
                    .addTopic(differentTopicId, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
                    .addRacks()
                    .build(),
                differentTopicId
            ),

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated it. Thanks.

Signed-off-by: PoAn Yang <payang@apache.org>
Signed-off-by: PoAn Yang <payang@apache.org>
Signed-off-by: PoAn Yang <payang@apache.org>
@dajac
Copy link
Member

dajac commented May 14, 2025

@FrankYang0529 Thanks for the update. Could you please also update the description of the PR?

Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made another pass. I only have a few minor comments. It looks pretty good to me.

HashStream64 hasher = Hashing.xxh3_64().hashStream();
hasher = hasher
.putByte(TOPIC_HASH_MAGIC_BYTE) // magic byte
.putLong(topicImage.id().hashCode()) // topic ID
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder about this one. It may be better to hash it as two longs (getMostSignificantBits and getLeastSignificantBits). What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. This can use whole data of Uuid to calculate topic hash. It's better.


HashStream64 hasher = Hashing.xxh3_64().hashStream();
hasher = hasher
.putByte(TOPIC_HASH_MAGIC_BYTE) // magic byte
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether we should remove the inline comments are the code is pretty self explanatory and you already explain the format in the java doc.

Signed-off-by: PoAn Yang <payang@apache.org>
Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm, thanks.

@dajac dajac merged commit a1008dc into apache:trunk May 15, 2025
25 checks passed
@FrankYang0529 FrankYang0529 deleted the KAFKA-17747-2 branch May 15, 2025 12:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

build Gradle build or GitHub Actions clients dependencies Pull requests that update a dependency file streams tools

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants