Skip to content

Commit 48f3e61

Browse files
jiminhsiehsimonsouter
authored andcommitted
Fix compiler warning & thread was not terminated (#137)
* Remove unused input parameter Kafka Consumer Configs didn't have configurations related to topic. * Fix thread was not terminated correctly.
1 parent 0f690e1 commit 48f3e61

File tree

2 files changed

+7
-5
lines changed

2 files changed

+7
-5
lines changed

akka/src/test/scala/cakesolutions/kafka/akka/KafkaConsumerActorSpec.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class KafkaConsumerActorSpec(system_ : ActorSystem) extends KafkaIntSpec(system_
5454
""".stripMargin)
5555
)
5656

57-
def configuredActor(topic: String): Config =
57+
def configuredActor: Config =
5858
ConfigFactory.parseString(
5959
s"""
6060
| bootstrap.servers = "localhost:$kafkaPort",
@@ -98,7 +98,7 @@ class KafkaConsumerActorSpec(system_ : ActorSystem) extends KafkaIntSpec(system_
9898
producer.flush()
9999

100100
// Consumer and actor config in same config file
101-
val consumer = system.actorOf(KafkaConsumerActor.props(configuredActor(topic), new StringDeserializer(), new StringDeserializer(), testActor))
101+
val consumer = system.actorOf(KafkaConsumerActor.props(configuredActor, new StringDeserializer(), new StringDeserializer(), testActor))
102102
consumer ! Subscribe.AutoPartition(List(topic))
103103

104104
val rs = expectMsgClass(30.seconds, classOf[ConsumerRecords[String, String]])
@@ -120,7 +120,7 @@ class KafkaConsumerActorSpec(system_ : ActorSystem) extends KafkaIntSpec(system_
120120
val downstreamActor = TestProbe().ref
121121

122122
// Consumer and actor config in same config file
123-
val consumer = system.actorOf(KafkaConsumerActor.props(configuredActor(topic), new StringDeserializer(), new StringDeserializer(), downstreamActor))
123+
val consumer = system.actorOf(KafkaConsumerActor.props(configuredActor, new StringDeserializer(), new StringDeserializer(), downstreamActor))
124124
consumer ! Subscribe.AutoPartition(List(topic))
125125

126126
// Initiate DeathWatch

client/src/test/scala/cakesolutions/kafka/ConfigureSerializationSpec.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ class ConfigureSerializationSpec extends KafkaIntSpec{
4646
""".stripMargin
4747
), keySerializer, valueSerializer)
4848

49-
val _ = KafkaProducer(conf)
49+
val producer = KafkaProducer(conf)
50+
producer.close
5051

5152
keySerializer.configuration shouldEqual "mock_value"
5253
keySerializer.isKeySerializer shouldEqual true
@@ -66,7 +67,8 @@ class ConfigureSerializationSpec extends KafkaIntSpec{
6667
""".stripMargin
6768
), keyDeserializer, valueDeserializer)
6869

69-
val _ = KafkaConsumer(conf)
70+
val consumer = KafkaConsumer(conf)
71+
consumer.close
7072

7173
keyDeserializer.configuration shouldEqual "mock_value"
7274
keyDeserializer.isKeyDeserializer shouldEqual true

0 commit comments

Comments
 (0)