Skip to content

Commit d474c16

Browse files
committed
✨ feat: upgrade to Kafka 4.x
1 parent 451b865 commit d474c16

File tree

10 files changed

+57
-67
lines changed

10 files changed

+57
-67
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ Versions match the version of Confluent Schema Registry they're built against.
1919

2020
| embedded-kafka-schema-registry version | Confluent Schema Registry version | embedded-kafka & Kafka Kafka version | Scala versions | Java version |
2121
|----------------------------------------|-----------------------------------|--------------------------------------|-----------------|--------------|
22+
| 8.0.0 | 8.0.0 | 4.0.x | 2.13, 3.3 | 17+ |
2223
| 7.9.2 | 7.9.2 | 3.9.x | 2,12, 2.13, 3.3 | 17+ |
2324
| 7.9.1 | 7.9.1 | 3.9.x | 2,12, 2.13, 3.3 | 17+ |
2425
| 7.9.0 | 7.9.0 | 3.9.x | 2,12, 2.13, 3.3 | 8+ |
@@ -69,7 +70,7 @@ class MySpec extends AnyWordSpecLike with Matchers with EmbeddedKafka {
6970
}
7071
```
7172

72-
* In-memory Zookeeper, Kafka, and Schema Registry will be instantiated respectively on port 6000, 6001, and 6002 and automatically shutdown at the end of the test.
73+
* Kafka Broker, Kafka Controller and Schema Registry will be instantiated respectively on port 6001, 6002, and 6003 and automatically shutdown at the end of the test.
7374

7475
## embedded-kafka-schema-registry-streams
7576

build.sbt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ lazy val commonSettings = Seq(
7878
organization := "io.github.embeddedkafka",
7979
scalaVersion := Versions.Scala213,
8080
crossScalaVersions := Seq(
81-
Versions.Scala212,
8281
Versions.Scala213,
8382
Versions.Scala3
8483
)

embedded-kafka-schema-registry/src/main/scala/io/github/embeddedkafka/schemaregistry/EmbeddedKafka.scala

Lines changed: 27 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,7 @@ import io.github.embeddedkafka.schemaregistry.ops.{
77
RunningSchemaRegistryOps,
88
SchemaRegistryOps
99
}
10-
import io.github.embeddedkafka.{
11-
EmbeddedKafkaSupport,
12-
EmbeddedServer,
13-
EmbeddedZ,
14-
ServerOps
15-
}
10+
import io.github.embeddedkafka.{EmbeddedKafkaSupport, EmbeddedServer, ServerOps}
1611

1712
trait EmbeddedKafka
1813
extends EmbeddedKafkaSupport[EmbeddedKafkaConfig]
@@ -30,26 +25,28 @@ trait EmbeddedKafka
3025

3126
override private[embeddedkafka] def withRunningServers[T](
3227
config: EmbeddedKafkaConfig,
33-
actualZkPort: Int,
3428
kafkaLogsDir: Path
3529
)(body: EmbeddedKafkaConfig => T): T = {
36-
val broker =
30+
val (broker, controller) =
3731
startKafka(
3832
config.kafkaPort,
39-
actualZkPort,
33+
config.controllerPort,
4034
config.customBrokerProperties,
4135
kafkaLogsDir
4236
)
43-
val actualKafkaPort = EmbeddedKafka.kafkaPort(broker)
44-
val restApp = startSchemaRegistry(
37+
38+
val actualBrokerPort = EmbeddedKafka.kafkaPort(broker)
39+
val actualControllerPort = EmbeddedKafka.controllerPort(controller)
40+
41+
val restApp = startSchemaRegistry(
4542
config.schemaRegistryPort,
46-
actualKafkaPort,
43+
actualBrokerPort,
4744
config.customSchemaRegistryProperties
4845
)
4946

5047
val configWithUsedPorts = EmbeddedKafkaConfig(
51-
actualKafkaPort,
52-
actualZkPort,
48+
actualBrokerPort,
49+
actualControllerPort,
5350
EmbeddedKafka.schemaRegistryPort(restApp),
5451
config.customBrokerProperties,
5552
config.customProducerProperties,
@@ -61,8 +58,13 @@ trait EmbeddedKafka
6158
body(configWithUsedPorts)
6259
} finally {
6360
restApp.stop()
61+
// In combined mode, we want to shut down the broker first, since the controller may be
62+
// needed for controlled shutdown. Additionally, the controller shutdown process currently
63+
// stops the raft client early on, which would disrupt broker shutdown.
6464
broker.shutdown()
65+
controller.shutdown()
6566
broker.awaitShutdown()
67+
controller.awaitShutdown()
6668
}
6769
}
6870
}
@@ -74,32 +76,29 @@ object EmbeddedKafka
7476
override def start()(
7577
implicit config: EmbeddedKafkaConfig
7678
): EmbeddedKWithSR = {
77-
val zkLogsDir = Files.createTempDirectory("zookeeper-logs")
7879
val kafkaLogsDir = Files.createTempDirectory("kafka-logs")
7980

80-
val factory =
81-
EmbeddedZ(startZooKeeper(config.zooKeeperPort, zkLogsDir), zkLogsDir)
82-
83-
val actualZookeeperPort = zookeeperPort(factory)
84-
val kafkaBroker = startKafka(
81+
val (broker, controller) = startKafka(
8582
kafkaPort = config.kafkaPort,
86-
zooKeeperPort = actualZookeeperPort,
83+
controllerPort = config.controllerPort,
8784
customBrokerProperties = config.customBrokerProperties,
8885
kafkaLogDir = kafkaLogsDir
8986
)
9087

91-
val actualKafkaPort = EmbeddedKafka.kafkaPort(kafkaBroker)
92-
val restApp = EmbeddedSR(
88+
val actualBrokerPort = EmbeddedKafka.kafkaPort(broker)
89+
val actualControllerPort = EmbeddedKafka.controllerPort(controller)
90+
91+
val restApp = EmbeddedSR(
9392
startSchemaRegistry(
9493
config.schemaRegistryPort,
95-
actualKafkaPort,
94+
actualBrokerPort,
9695
config.customSchemaRegistryProperties
9796
)
9897
)
9998

10099
val configWithUsedPorts = EmbeddedKafkaConfigImpl(
101-
kafkaPort = actualKafkaPort,
102-
zooKeeperPort = actualZookeeperPort,
100+
kafkaPort = actualBrokerPort,
101+
controllerPort = actualControllerPort,
103102
schemaRegistryPort = EmbeddedKafka.schemaRegistryPort(restApp.app),
104103
customBrokerProperties = config.customBrokerProperties,
105104
customProducerProperties = config.customProducerProperties,
@@ -108,8 +107,8 @@ object EmbeddedKafka
108107
)
109108

110109
val server = EmbeddedKWithSR(
111-
factory = Option(factory),
112-
broker = kafkaBroker,
110+
broker = broker,
111+
controller = controller,
113112
app = restApp,
114113
logsDirs = kafkaLogsDir,
115114
configWithUsedPorts

embedded-kafka-schema-registry/src/main/scala/io/github/embeddedkafka/schemaregistry/config.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ trait EmbeddedKafkaConfig extends OriginalEmbeddedKafkaConfig {
1111

1212
case class EmbeddedKafkaConfigImpl(
1313
kafkaPort: Int,
14-
zooKeeperPort: Int,
14+
controllerPort: Int,
1515
schemaRegistryPort: Int,
1616
customBrokerProperties: Map[String, String],
1717
customProducerProperties: Map[String, String],
@@ -22,13 +22,13 @@ case class EmbeddedKafkaConfigImpl(
2222
}
2323

2424
object EmbeddedKafkaConfig {
25-
lazy val defaultSchemaRegistryPort = 6002
25+
lazy val defaultSchemaRegistryPort = 6003
2626

2727
implicit val defaultConfig: EmbeddedKafkaConfig = apply()
2828

2929
def apply(
3030
kafkaPort: Int = OriginalEmbeddedKafkaConfig.defaultKafkaPort,
31-
zooKeeperPort: Int = OriginalEmbeddedKafkaConfig.defaultZookeeperPort,
31+
controllerPort: Int = OriginalEmbeddedKafkaConfig.defaultControllerPort,
3232
schemaRegistryPort: Int = defaultSchemaRegistryPort,
3333
customBrokerProperties: Map[String, String] = Map.empty,
3434
customProducerProperties: Map[String, String] = Map.empty,
@@ -37,7 +37,7 @@ object EmbeddedKafkaConfig {
3737
): EmbeddedKafkaConfig =
3838
EmbeddedKafkaConfigImpl(
3939
kafkaPort,
40-
zooKeeperPort,
40+
controllerPort,
4141
schemaRegistryPort,
4242
customBrokerProperties,
4343
customProducerProperties,

embedded-kafka-schema-registry/src/main/scala/io/github/embeddedkafka/schemaregistry/ops/SchemaRegistryOps.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ trait SchemaRegistryOps {
3535

3636
val restAppProperties = Map(
3737
RestConfig.LISTENERS_CONFIG -> s"http://localhost:$actualSchemaRegistryPort",
38-
SchemaRegistryConfig.KAFKASTORE_BOOTSTRAP_SERVERS_CONFIG -> s"localhost:$kafkaPort"
38+
SchemaRegistryConfig.KAFKASTORE_BOOTSTRAP_SERVERS_CONFIG -> s"PLAINTEXT://localhost:$kafkaPort"
3939
) ++ customProperties
4040

4141
val restApp = new SchemaRegistryRestApplication(

embedded-kafka-schema-registry/src/main/scala/io/github/embeddedkafka/schemaregistry/servers.scala

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,9 @@
11
package io.github.embeddedkafka.schemaregistry
22

33
import java.nio.file.Path
4-
54
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication
6-
import kafka.server.KafkaServer
7-
import io.github.embeddedkafka.{
8-
EmbeddedServer,
9-
EmbeddedServerWithKafka,
10-
EmbeddedZ
11-
}
5+
import io.github.embeddedkafka.{EmbeddedServer, EmbeddedServerWithKafka}
6+
import kafka.server.{BrokerServer, ControllerServer}
127

138
import scala.reflect.io.Directory
149

@@ -28,19 +23,22 @@ case class EmbeddedSR(app: SchemaRegistryRestApplication)
2823
}
2924

3025
case class EmbeddedKWithSR(
31-
factory: Option[EmbeddedZ],
32-
broker: KafkaServer,
26+
broker: BrokerServer,
27+
controller: ControllerServer,
3328
app: EmbeddedSR,
3429
logsDirs: Path,
3530
config: EmbeddedKafkaConfig
3631
) extends EmbeddedServerWithKafka {
3732
override def stop(clearLogs: Boolean): Unit = {
3833
app.stop()
3934

35+
// In combined mode, we want to shut down the broker first, since the controller may be
36+
// needed for controlled shutdown. Additionally, the controller shutdown process currently
37+
// stops the raft client early on, which would disrupt broker shutdown.
4038
broker.shutdown()
39+
controller.shutdown()
4140
broker.awaitShutdown()
42-
43-
factory.foreach(_.stop(clearLogs))
41+
controller.awaitShutdown()
4442

4543
if (clearLogs) {
4644
val _ = Directory(logsDirs.toFile).deleteRecursively()

embedded-kafka-schema-registry/src/test/scala/io/github/embeddedkafka/schemaregistry/EmbeddedKafkaObjectSpec.scala

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,14 @@ class EmbeddedKafkaObjectSpec extends EmbeddedKafkaSpecSupport {
1414
val firstBroker = EmbeddedKafka.start()(
1515
EmbeddedKafkaConfig(
1616
kafkaPort = 7000,
17-
zooKeeperPort = 7001,
17+
controllerPort = 7001,
1818
schemaRegistryPort = 7002
1919
)
2020
)
2121
EmbeddedKafka.start()(
2222
EmbeddedKafkaConfig(
2323
kafkaPort = 8000,
24-
zooKeeperPort = 8001,
24+
controllerPort = 8001,
2525
schemaRegistryPort = 8002
2626
)
2727
)
@@ -47,11 +47,11 @@ class EmbeddedKafkaObjectSpec extends EmbeddedKafkaSpecSupport {
4747
EmbeddedKafka.stop()
4848
}
4949

50-
"start and stop Kafka, Zookeeper, and Schema Registry on different specified ports using an implicit configuration" in {
50+
"start and stop Kafka Broker, Kafka Controller, and Schema Registry on different specified ports using an implicit configuration" in {
5151
implicit val config: EmbeddedKafkaConfig =
5252
EmbeddedKafkaConfig(
5353
kafkaPort = 12345,
54-
zooKeeperPort = 54321,
54+
controllerPort = 54321,
5555
schemaRegistryPort = 13542
5656
)
5757

@@ -66,7 +66,7 @@ class EmbeddedKafkaObjectSpec extends EmbeddedKafkaSpecSupport {
6666
}
6767

6868
"invoking the isRunning method" should {
69-
"return true when Schema Registry, Kafka, and Zookeeper are all running" in {
69+
"return true when Schema Registry, Kafka Broker, and Kafka Controller are all running" in {
7070
implicit val config: EmbeddedKafkaConfig =
7171
EmbeddedKafkaConfig()
7272

@@ -76,13 +76,10 @@ class EmbeddedKafkaObjectSpec extends EmbeddedKafkaSpecSupport {
7676
EmbeddedKafka.isRunning shouldBe false
7777
}
7878

79-
"return true when Schema Registry, Kafka, and Zookeeper are all running, if started separately" in {
79+
"return true when Schema Registry, Kafka Broker, and Kafka Controller are all running, if started separately" in {
8080
implicit val config: EmbeddedKafkaConfig =
8181
EmbeddedKafkaConfig()
8282

83-
EmbeddedKafka.startZooKeeper(
84-
Files.createTempDirectory("zookeeper-test-logs")
85-
)
8683
EmbeddedKafka.startKafka(Files.createTempDirectory("kafka-test-logs"))
8784
EmbeddedKafka.startSchemaRegistry
8885

@@ -91,13 +88,10 @@ class EmbeddedKafkaObjectSpec extends EmbeddedKafkaSpecSupport {
9188
EmbeddedKafka.isRunning shouldBe false
9289
}
9390

94-
"return false when only Kafka and Zookeeper are running" in {
91+
"return false when only Kafka Broker and Kafka Controller are running" in {
9592
implicit val config: EmbeddedKafkaConfig =
9693
EmbeddedKafkaConfig()
9794

98-
EmbeddedKafka.startZooKeeper(
99-
Files.createTempDirectory("zookeeper-test-logs")
100-
)
10195
EmbeddedKafka.startKafka(Files.createTempDirectory("kafka-test-logs"))
10296
EmbeddedKafka.startSchemaRegistry
10397
EmbeddedKafka.stopSchemaRegistry()

embedded-kafka-schema-registry/src/test/scala/io/github/embeddedkafka/schemaregistry/EmbeddedKafkaTraitSpec.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,11 @@ class EmbeddedKafkaTraitSpec extends EmbeddedKafkaSpecSupport {
2828
}
2929
}
3030

31-
"start and stop Kafka, Zookeeper, and Schema Registry successfully on non-zero ports" in {
31+
"start and stop Kafka Broker, Kafka Controller, and Schema Registry successfully on non-zero ports" in {
3232
val userDefinedConfig =
3333
EmbeddedKafkaConfig(
3434
kafkaPort = 12345,
35-
zooKeeperPort = 12346,
35+
controllerPort = 12346,
3636
schemaRegistryPort = 12347
3737
)
3838

@@ -49,12 +49,12 @@ class EmbeddedKafkaTraitSpec extends EmbeddedKafkaSpecSupport {
4949
private def everyServerIsAvailable(config: EmbeddedKafkaConfig): Assertion = {
5050
expectedServerStatus(config.kafkaPort, Available)
5151
expectedServerStatus(config.schemaRegistryPort, Available)
52-
expectedServerStatus(config.zooKeeperPort, Available)
52+
expectedServerStatus(config.controllerPort, Available)
5353
}
5454

5555
private def noServerIsAvailable(config: EmbeddedKafkaConfig): Assertion = {
5656
expectedServerStatus(config.kafkaPort, NotAvailable)
5757
expectedServerStatus(config.schemaRegistryPort, NotAvailable)
58-
expectedServerStatus(config.zooKeeperPort, NotAvailable)
58+
expectedServerStatus(config.controllerPort, NotAvailable)
5959
}
6060
}

kafka-streams/src/test/scala/io/github/embeddedkafka/schemaregistry/streams/ExampleKafkaStreamsSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class ExampleKafkaStreamsSpec
3131
implicit val config: EmbeddedKafkaConfig =
3232
EmbeddedKafkaConfig(
3333
kafkaPort = 7000,
34-
zooKeeperPort = 7001,
34+
controllerPort = 7001,
3535
schemaRegistryPort = 7002
3636
)
3737

project/Dependencies.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,9 @@ object Dependencies {
1313
object Versions {
1414
val Scala3 = "3.3.6"
1515
val Scala213 = "2.13.16"
16-
val Scala212 = "2.12.20"
17-
val EmbeddedKafka = "3.9.1"
16+
val EmbeddedKafka = "4.0.1.1"
1817
val ConfluentPlatform = "8.0.0"
19-
val Slf4j = "1.7.36"
18+
val Slf4j = "2.0.17"
2019
val ScalaTest = "3.2.19"
2120
}
2221

0 commit comments

Comments
 (0)