Skip to content

Conversation

@jukkakarvanen
Copy link
Contributor

Example to Unit Test KafkaStreamsWordCountApplication.WordCountProcessorApplication using TopologyTestDriver. So this is partly related to #110.

This is a better way to unit test stream logic than using EmbeddedKafka. See testing instructions:
https://kafka.apache.org/20/documentation/streams/developer-guide/testing.html

This TopologyTestDriver is now in dependency package org.apache-kafka:kafka-streams-test-utils.
Not seen this referenced in spring-kafka or spring-cloud packages. So now kafka.version property in pom should be kept in sync with spring-kafka-test kafka version.

The type of unused key type changed from Object to Bytes to able to use BytesSerde.

The current example utilization of internal classes causes more complex reference to those classes if those would be normal classes.

I hope someone would invent a way to avoid some of this boilerplate code in this test class example.

Changed non used key type from Object to Bytes to use BytesSerde
Added default contructor to support JsonSerde
Added toString for better test output
@pivotal-issuemaster
Copy link

@jukkakarvanen Please sign the Contributor License Agreement!

Click here to manually synchronize the status of this Pull Request.

See the FAQ for frequently asked questions.

@pivotal-issuemaster
Copy link

@jukkakarvanen Thank you for signing the Contributor License Agreement!

@sobychacko
Copy link
Contributor

@jukkakarvanen Thank you very much for this PR. This has been in the backlog for a while but never got to it. This will be very useful for the community. I will review it soon and merge.

@sobychacko
Copy link
Contributor

@jukkakarvanen I am reviewing the changes now. What do you mean by these statements in the PR description:
The current example utilization of internal classes causes more complex reference to those classes if those would be normal classes.
I hope someone would invent a way to avoid some of this boilerplate code in this test class example."

@jukkakarvanen
Copy link
Contributor Author

@jukkakarvanen I am reviewing the changes now. What do you mean by these statements in the PR description:
The current example utilization of internal classes causes more complex reference to those classes if those would be normal classes.

With this I was meaning because now this Stream logic class WordCountProcessorApplication and also the data model class WordCount are internal class of KafkaStreamsWordCountApplication, the reference are more complex if those would be normal classes. See for example lines 52,53,57,77, 78 in WordCountProcessorApplicationTests.java

I hope someone would invent a way to avoid some of this boilerplate code in this test class example."

This was refererring Sprint annotation of @StreamListener(INPUT_TOPIC), @sendto(OUTPUT_TOPIC)
Which now implement the initialization of stream and forwarding the message TO output_topic.
Now I needed to add the lines 77 an 80 as duplicate code for those. So it would be ideal if there are way to get those to test code without manual writing to test class.

Also these methods readOutput and getOutputList are something you end up easily writing with current kafka-streams-test-utils. That's why I am proposing some addition to Kafka project: https://cwiki.apache.org/confluence/display/KAFKA/KIP-456%3A+Helper+classes+to+make+it+simpler+to+write+test+logic+with+TopologyTestDriver
If this KIP is accepted, those methods would come from kafka test utils already and also those pipeInput lines would be simplified.

@sobychacko
Copy link
Contributor

@jukkakarvanen Ok that makes sense. With the current setup, the StreamListener mechanism will trigger the actual Kafka infrastructure setup. We could probably improve the code in the binder to avoid that setup if we can detect that it is being used with Kafka Streams unit testing framework. But that is a bigger change that goes beyond the scope of this PR. I am hoping that we can come up with some strategy to fix that.
I will merge your PR as this a good starting point showing how we can unit test Kafka Streams processors in Spring Cloud Stream directly using the testing framework.

@sobychacko
Copy link
Contributor

@jukkakarvanen Thank you for the PR. Merged upstream after minor polishing. Looking forward to seeing more PR's from you.

@sobychacko sobychacko closed this Apr 26, 2019
@jukkakarvanen
Copy link
Contributor Author

@sobychacko, You are welcome.

This TopologyTestDriver is now in dependency package org.apache-kafka:kafka-streams-test-utils.
Not seen this referenced in spring-kafka or spring-cloud packages. So now kafka.version property in pom should be kept in sync with spring-kafka-test kafka version.

One improvement idea worth considering: would it be a good idea to add kafka-streams-test-utils as dependence to spring-kafka-test or add separate spring-kafka-streams-test package?

Adding spring-kafka-streams-test to drag in kafka-streams-test-utils would be the ideal solution, but when spring-kafka-test is mainly in test scope, so adding dependency to it would not impact to runtime dependencies needed if kafka streams not used.

@sobychacko
Copy link
Contributor

@garyrussell Is the above ^^ something that we can do with boot dependencies the way we pick up other Kafka dependencies?

@garyrussell
Copy link
Contributor

Yes, we can add a streams test module to the spring-kafka project; but it will also need a change to initializr to pull it in to test scope when KafkaStreams is selected (like is currently done for s-k-test when kafka is selected).

Please open an issue against spring-kafka.

@sobychacko
Copy link
Contributor

@jukkakarvanen ^^

@jukkakarvanen
Copy link
Contributor Author

Issue to spring-kafka added.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants