Skip to content

Commit

Permalink
[COMMON] update kafka from 3.5.1 to 3.7.0 (#1845)
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 authored Mar 8, 2024
1 parent 35aff76 commit 4bc880b
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 11 deletions.
14 changes: 14 additions & 0 deletions common/src/main/java/org/astraea/common/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,10 @@ public static Object member(Object object, String attribute) {
return reflectionAttribute(object.getClass(), object, attribute);
}

public static Object method(Object object, String name) {
return reflectionMethod(object.getClass(), object, name);
}

/**
* reflection class attribute
*
Expand All @@ -415,6 +419,16 @@ private static Object reflectionAttribute(Class<?> clz, Object object, String at
throw new RuntimeException(attribute + " is not existent in " + object.getClass().getName());
}

private static Object reflectionMethod(Class<?> clz, Object object, String attribute) {
try {
var method = clz.getDeclaredMethod(attribute);
method.setAccessible(true);
return method.invoke(object);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public static <T> List<T> constants(
Class<?> clz, Predicate<String> variableNameFilter, Class<T> cast) {
return Arrays.stream(clz.getFields())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ protected abstract static class BaseConsumer<Key, Value> implements Consumer<Key
public BaseConsumer(org.apache.kafka.clients.consumer.Consumer<Key, Value> kafkaConsumer) {
this.kafkaConsumer = kafkaConsumer;
// KafkaConsumer does not expose client-id
this.clientId = (String) Utils.member(kafkaConsumer, "clientId");
this.clientId = (String) Utils.method(kafkaConsumer, "clientId");
}

@Override
Expand Down
4 changes: 2 additions & 2 deletions docker/start_broker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ source $DOCKER_FOLDER/docker_build_common.sh
# ===============================[global variables]===============================
declare -r ACCOUNT=${ACCOUNT:-skiptests}
declare -r KAFKA_ACCOUNT=${KAFKA_ACCOUNT:-apache}
declare -r VERSION=${REVISION:-${VERSION:-3.5.1}}
declare -r VERSION=${REVISION:-${VERSION:-3.7.0}}
declare -r DOCKERFILE=$DOCKER_FOLDER/broker.dockerfile
declare -r DATA_FOLDER_IN_CONTAINER_PREFIX="/tmp/log-folder"
declare -r EXPORTER_VERSION="0.16.1"
Expand Down Expand Up @@ -61,7 +61,7 @@ function showHelp() {
echo " ACCOUNT=skiptests set the github account for astraea repo"
echo " HEAP_OPTS=\"-Xmx2G -Xms2G\" set broker JVM memory"
echo " REVISION=trunk set revision of kafka source code to build container"
echo " VERSION=3.5.1 set version of kafka distribution"
echo " VERSION=3.7.0 set version of kafka distribution"
echo " BUILD=false set true if you want to build image locally"
echo " RUN=false set false if you want to build/pull image only"
echo " DATA_FOLDERS=/tmp/folder1 set host folders used by broker"
Expand Down
4 changes: 2 additions & 2 deletions docker/start_controller.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ source $DOCKER_FOLDER/docker_build_common.sh
# ===============================[global variables]===============================
declare -r ACCOUNT=${ACCOUNT:-skiptests}
declare -r KAFKA_ACCOUNT=${KAFKA_ACCOUNT:-apache}
declare -r VERSION=${REVISION:-${VERSION:-3.5.1}}
declare -r VERSION=${REVISION:-${VERSION:-3.7.0}}
declare -r DOCKERFILE=$DOCKER_FOLDER/controller.dockerfile
declare -r EXPORTER_VERSION="0.16.1"
declare -r CLUSTER_ID=${CLUSTER_ID:-"$(randomString)"}
Expand Down Expand Up @@ -52,7 +52,7 @@ function showHelp() {
echo " ACCOUNT=skiptests set the github account for astraea repo"
echo " HEAP_OPTS=\"-Xmx2G -Xms2G\" set controller JVM memory"
echo " REVISION=trunk set revision of kafka source code to build container"
echo " VERSION=3.5.1 set version of kafka distribution"
echo " VERSION=3.7.0 set version of kafka distribution"
echo " BUILD=false set true if you want to build image locally"
echo " RUN=false set false if you want to build/pull image only"
echo " META_FOLDER=/tmp/folder1 set host folder used by controller"
Expand Down
4 changes: 2 additions & 2 deletions docker/start_worker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ source $DOCKER_FOLDER/docker_build_common.sh
# ===============================[global variables]===============================
declare -r ACCOUNT=${ACCOUNT:-skiptests}
declare -r KAFKA_ACCOUNT=${KAFKA_ACCOUNT:-apache}
declare -r VERSION=${REVISION:-${VERSION:-3.5.1}}
declare -r VERSION=${REVISION:-${VERSION:-3.7.0}}
declare -r DOCKERFILE=$DOCKER_FOLDER/worker.dockerfile
declare -r WORKER_PORT=${WORKER_PORT:-"$(getRandomPort)"}
declare -r CONTAINER_NAME="worker-$WORKER_PORT"
Expand Down Expand Up @@ -50,7 +50,7 @@ function showHelp() {
echo " ACCOUNT=skiptests set the github account for astraea repo"
echo " HEAP_OPTS=\"-Xmx2G -Xms2G\" set worker JVM memory"
echo " REVISION=trunk set revision of kafka source code to build container"
echo " VERSION=3.5.1 set version of kafka distribution"
echo " VERSION=3.7.0 set version of kafka distribution"
echo " BUILD=false set true if you want to build image locally"
echo " RUN=false set false if you want to build/pull image only"
echo " WORKER_PLUGIN_PATH=/tmp/worker-plugins set plugin path to kafka worker"
Expand Down
3 changes: 2 additions & 1 deletion gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def versions = [
"jmh-core" : project.properties['jmh-core.version'] ?: "1.36",
"jmh-generator-annprocess": project.properties['jmh-generator.version'] ?: "1.36",
junit : project.properties['junit.version'] ?: "5.9.3",
kafka : project.properties['kafka.version'] ?: "3.5.1",
kafka : project.properties['kafka.version'] ?: "3.7.0",
mockito : project.properties['mockito.version'] ?: "5.4.0",
"mockito-inline" : project.properties['mockito.version'] ?: "5.2.0",
scala : project.properties['scala.version'] ?: "2.13.11",
Expand All @@ -57,6 +57,7 @@ libs += [
"kafka-connect-runtime" : "org.apache.kafka:connect-runtime:${versions["kafka"]}",
"kafka-core" : "org.apache.kafka:kafka_2.13:${versions["kafka"]}",
"kafka-server-common" : "org.apache.kafka:kafka-server-common:${versions["kafka"]}",
"kafka-metadata" : "org.apache.kafka:kafka-metadata:${versions["kafka"]}",
"mockito-core" : "org.mockito:mockito-core:${versions["mockito"]}",
"mockito-inline" : "org.mockito:mockito-inline:${versions["mockito-inline"]}",
scala : "org.scala-lang:scala-library:${versions["scala"]}",
Expand Down
1 change: 1 addition & 0 deletions it/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencies {
implementation libs["junit"]
implementation libs["kafka-core"]
implementation libs["kafka-server-common"]
implementation libs["kafka-metadata"]
implementation libs["kafka-connect-runtime"]
implementation libs["kafka-connect-json"]
implementation libs["hadoop-minicluster"]
Expand Down
14 changes: 11 additions & 3 deletions it/src/main/java/org/astraea/it/BrokerCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,32 @@
import java.util.stream.IntStream;
import kafka.server.KafkaConfig;
import kafka.server.KafkaRaftServer;
import kafka.server.MetaProperties;
import kafka.server.Server;
import kafka.tools.StorageTool;
import org.apache.kafka.common.DirectoryId;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.metadata.properties.MetaProperties;
import org.apache.kafka.metadata.properties.MetaPropertiesVersion;
import org.apache.kafka.server.common.MetadataVersion;

public interface BrokerCluster extends AutoCloseable {

private static CompletableFuture<Map.Entry<Integer, Server>> server(
Map<String, String> configs, Set<String> folders, String clusterId, int nodeId) {

StorageTool.formatCommand(
new PrintStream(new ByteArrayOutputStream()),
scala.collection.JavaConverters.collectionAsScalaIterableConverter(folders)
.asScala()
.toSeq(),
new MetaProperties(clusterId, nodeId),
MetadataVersion.latest(),
new MetaProperties.Builder()
.setVersion(MetaPropertiesVersion.V1)
.setClusterId(clusterId)
.setNodeId(nodeId)
.setDirectoryId(DirectoryId.random())
.build(),
MetadataVersion.latestProduction(),
true);

return CompletableFuture.supplyAsync(
Expand Down

0 comments on commit 4bc880b

Please sign in to comment.