Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set replication factor for offsets topic #588

Merged
merged 2 commits into from
Oct 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ sudo: false
services:
- docker

before_install:
# upgrade to a later docker-compose which supports services.kafka.scale
- sudo rm /usr/local/bin/docker-compose
- curl -L https://github.com/docker/compose/releases/download/1.22.0/docker-compose-`uname -s`-`uname -m` > docker-compose
- chmod +x docker-compose
- sudo mv docker-compose /usr/local/bin

script:
- sbt -jvm-opts .jvmopts-travis "$CMD"

Expand Down
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ lazy val tests = project
dockerComposeTestCommandOptions := {
import com.github.ehsanyou.sbt.docker.compose.commands.test._
DockerComposeTestCmd(DockerComposeTest.ItTest)
.withOption("--scale", s"kafka=${kafkaScale.value}")
.withEnvVar("KAFKA_SCALE", kafkaScale.value.toString)
}
)

Expand Down Expand Up @@ -244,7 +244,7 @@ lazy val benchmarks = project
dockerComposeTestCommandOptions := {
import com.github.ehsanyou.sbt.docker.compose.commands.test._
DockerComposeTestCmd(DockerComposeTest.ItTest)
.withOption("--scale", s"kafka=${kafkaScale.value}")
.withEnvVar("KAFKA_SCALE", kafkaScale.value.toString)
},
dockerfile in docker := {
val artifact: File = assembly.value
Expand Down
4 changes: 3 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
# See dockerhub for different versions of kafka and zookeeper
# https://hub.docker.com/r/wurstmeister/kafka/
# https://hub.docker.com/r/wurstmeister/zookeeper/
version: '2'
version: '2.2'
services:
zookeeper:
image: wurstmeister/zookeeper:3.4.6
ports:
- "2181:2181"
kafka:
scale: ${KAFKA_SCALE:-1}
image: wurstmeister/kafka:1.1.0
ports:
- "9094"
Expand All @@ -20,5 +21,6 @@ services:
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS: 1 # default was 300 (5 minutes)
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: ${KAFKA_SCALE:-1}
volumes:
- /var/run/docker.sock:/var/run/docker.sock
4 changes: 2 additions & 2 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ addSbtPlugin("org.foundweekends" % "sbt-bintray" % "0.5.4")
addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.3.0")
addSbtPlugin("lt.dvim.paradox" % "sbt-paradox-local" % "0.2")
addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.9.0")
// latest version with https://github.com/ehsanyou/sbt-docker-compose/pull/3
addSbtPlugin("com.github.ehsanyou" % "sbt-docker-compose" % "926a4d83")
// latest version with https://github.com/ehsanyou/sbt-docker-compose/pull/10
addSbtPlugin("com.github.ehsanyou" % "sbt-docker-compose" % "67284e73-envvars-2m")
resolvers += Resolver.bintrayIvyRepo("2m", "sbt-plugins")
7 changes: 5 additions & 2 deletions tests/src/it/scala/akka/kafka/PlainSourceFailoverSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,22 @@ class PlainSourceFailoverSpec extends ScalatestKafkaSpec(PlainSourceFailoverSpec
.withProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "100") // default was 5 * 60 * 1000 (five minutes)

val consumer = Consumer.plainSource(consumerConfig, Subscriptions.topics(topic))
.take(totalMessages)
.scan(0)((c, _) => c + 1)
.map { i =>
if (i % 1000 == 0) log.info(s"Received [$i] messages so far.")
i
}
.map(Some.apply)
.keepAlive(maxIdle = 10.seconds, () => None)
.takeWhile(_.isDefined)
.runWith(Sink.last)
.map(_.get)

waitUntilConsumerSummary(groupId, timeout = 5.seconds) {
case singleConsumer :: Nil => singleConsumer.assignment.size == partitions
}

val result = Source(0 to totalMessages)
val result = Source(1 to totalMessages)
.map { i =>
if (i % 1000 == 0) log.info(s"Sent [$i] messages so far.")
i.toString
Expand Down