|
29 | 29 | import org.apache.kafka.clients.consumer.ConsumerRecords; |
30 | 30 | import org.apache.kafka.common.serialization.Serdes; |
31 | 31 | import org.apache.kafka.common.serialization.StringDeserializer; |
32 | | -import org.apache.kafka.streams.Consumed; |
33 | | -import org.apache.kafka.streams.StreamsBuilder; |
34 | 32 | import org.apache.kafka.streams.StreamsConfig; |
35 | 33 | import org.apache.kafka.streams.kstream.KStream; |
36 | | -import org.apache.kafka.streams.kstream.Produced; |
| 34 | +import org.apache.kafka.streams.kstream.KStreamBuilder; |
37 | 35 | import org.junit.Test; |
38 | 36 | import org.junit.runner.RunWith; |
39 | 37 |
|
@@ -150,16 +148,16 @@ public StreamsConfig kStreamsConfigs() { |
150 | 148 |
|
151 | 149 | @Bean |
152 | 150 | @SuppressWarnings("unchecked") |
153 | | - public KStream<String, String> trueFalseStream(StreamsBuilder streamsBuilder) { |
| 151 | + public KStream<String, String> trueFalseStream(KStreamBuilder streamsBuilder) { |
154 | 152 | KStream<String, String> trueFalseStream = streamsBuilder |
155 | | - .stream(TRUE_FALSE_INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String())); |
| 153 | + .stream(Serdes.String(), Serdes.String(), TRUE_FALSE_INPUT_TOPIC); |
156 | 154 |
|
157 | 155 | KStream<String, String>[] branches = |
158 | 156 | trueFalseStream.branch((key, value) -> String.valueOf(true).equals(value), |
159 | 157 | (key, value) -> String.valueOf(false).equals(value)); |
160 | 158 |
|
161 | | - branches[0].to(TRUE_TOPIC, Produced.with(Serdes.String(), Serdes.String())); |
162 | | - branches[1].to(FALSE_TOPIC, Produced.with(Serdes.String(), Serdes.String())); |
| 159 | + branches[0].to(Serdes.String(), Serdes.String(), TRUE_TOPIC); |
| 160 | + branches[1].to(Serdes.String(), Serdes.String(), FALSE_TOPIC); |
163 | 161 |
|
164 | 162 | return trueFalseStream; |
165 | 163 | } |
|
0 commit comments