Skip to content

Commit ff15715

Browse files
committed
Add a test to json serialize both a key and value in a stream.
1 parent 31431fa commit ff15715

File tree

1 file changed

+207
-0
lines changed

1 file changed

+207
-0
lines changed
Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
/*
2+
* Copyright 2017-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.kstream;
18+
19+
import com.fasterxml.jackson.annotation.JsonCreator;
20+
import com.fasterxml.jackson.annotation.JsonProperty;
21+
import org.apache.kafka.clients.consumer.Consumer;
22+
import org.apache.kafka.clients.consumer.ConsumerConfig;
23+
import org.apache.kafka.clients.consumer.ConsumerRecord;
24+
import org.apache.kafka.clients.consumer.ConsumerRecords;
25+
import org.apache.kafka.clients.producer.ProducerConfig;
26+
import org.apache.kafka.common.serialization.Serde;
27+
import org.apache.kafka.streams.Consumed;
28+
import org.apache.kafka.streams.StreamsBuilder;
29+
import org.apache.kafka.streams.StreamsConfig;
30+
import org.apache.kafka.streams.kstream.KStream;
31+
import org.apache.kafka.streams.kstream.Printed;
32+
import org.apache.kafka.streams.kstream.Produced;
33+
import org.junit.Before;
34+
import org.junit.Test;
35+
import org.junit.runner.RunWith;
36+
import org.springframework.beans.factory.annotation.Autowired;
37+
import org.springframework.beans.factory.annotation.Value;
38+
import org.springframework.context.annotation.Bean;
39+
import org.springframework.context.annotation.Configuration;
40+
import org.springframework.kafka.annotation.EnableKafkaStreams;
41+
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
42+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
43+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
44+
import org.springframework.kafka.core.KafkaTemplate;
45+
import org.springframework.kafka.core.ProducerFactory;
46+
import org.springframework.kafka.support.serializer.JsonSerde;
47+
import org.springframework.kafka.support.serializer.JsonSerializer;
48+
import org.springframework.kafka.test.context.EmbeddedKafka;
49+
import org.springframework.kafka.test.rule.KafkaEmbedded;
50+
import org.springframework.kafka.test.utils.KafkaTestUtils;
51+
import org.springframework.test.annotation.DirtiesContext;
52+
import org.springframework.test.context.junit4.SpringRunner;
53+
54+
import java.util.HashMap;
55+
import java.util.Map;
56+
import java.util.UUID;
57+
58+
import static org.assertj.core.api.Assertions.assertThat;
59+
import static org.springframework.kafka.kstream.KafkaStreamsJsonSerializationTests.JsonObjectKey.jsonObjectKeySerde;
60+
import static org.springframework.kafka.kstream.KafkaStreamsJsonSerializationTests.JsonObjectValue.jsonObjectValueSerde;
61+
62+
/**
63+
* @author Elliot Kennedy
64+
*/
65+
@RunWith(SpringRunner.class)
66+
@DirtiesContext
67+
@EmbeddedKafka(partitions = 1,
68+
topics = {KafkaStreamsJsonSerializationTests.INPUT_TOPIC,
69+
KafkaStreamsJsonSerializationTests.OUTPUT_TOPIC})
70+
public class KafkaStreamsJsonSerializationTests {
71+
72+
public static final String INPUT_TOPIC = "input-topic";
73+
public static final String OUTPUT_TOPIC = "output-topic";
74+
75+
@Autowired
76+
private KafkaTemplate<JsonObjectKey, JsonObjectValue> template;
77+
78+
@Autowired
79+
private KafkaEmbedded kafkaEmbedded;
80+
81+
private Consumer<JsonObjectKey, JsonObjectValue> outputTopicConsumer;
82+
83+
@Before
84+
public void setup() throws Exception {
85+
Map<String, Object> consumerProps =
86+
KafkaTestUtils.consumerProps(UUID.randomUUID().toString(), "false", this.kafkaEmbedded);
87+
consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10000);
88+
89+
DefaultKafkaConsumerFactory<JsonObjectKey, JsonObjectValue> kafkaConsumerFactory =
90+
new DefaultKafkaConsumerFactory<>(consumerProps, jsonObjectKeySerde().deserializer(), jsonObjectValueSerde().deserializer());
91+
outputTopicConsumer = kafkaConsumerFactory.createConsumer();
92+
kafkaEmbedded.consumeFromAnEmbeddedTopic(outputTopicConsumer, OUTPUT_TOPIC);
93+
}
94+
95+
@Test
96+
public void testJsonSerialiazation() throws Exception {
97+
template.sendDefault(new JsonObjectKey(25), new JsonObjectValue("twenty-five"));
98+
99+
ConsumerRecords<JsonObjectKey, JsonObjectValue> outputTopicRecords = KafkaTestUtils.getRecords(outputTopicConsumer);
100+
101+
assertThat(outputTopicRecords.count()).isEqualTo(1);
102+
ConsumerRecord<JsonObjectKey, JsonObjectValue> output = outputTopicRecords.iterator().next();
103+
assertThat(output.key()).isInstanceOf(JsonObjectKey.class);
104+
assertThat(output.key().getKey()).isEqualTo(25);
105+
assertThat(output.value()).isInstanceOf(JsonObjectValue.class);
106+
assertThat(output.value().getValue()).isEqualTo("twenty-five");
107+
}
108+
109+
public static class JsonObjectKey {
110+
111+
private final Integer key;
112+
113+
@JsonCreator
114+
public JsonObjectKey(@JsonProperty(value = "key", required = true) Integer key) {
115+
this.key = key;
116+
}
117+
118+
public Integer getKey() {
119+
return key;
120+
}
121+
122+
public static Serde<JsonObjectKey> jsonObjectKeySerde() {
123+
return new JsonSerde<>(JsonObjectKey.class);
124+
}
125+
126+
@Override
127+
public String toString() {
128+
return "JsonObjectKey{" +
129+
"key=" + key +
130+
'}';
131+
}
132+
}
133+
134+
public static class JsonObjectValue {
135+
136+
private final String value;
137+
138+
@JsonCreator
139+
public JsonObjectValue(@JsonProperty(value = "value", required = true) String value) {
140+
this.value = value;
141+
}
142+
143+
public String getValue() {
144+
return value;
145+
}
146+
147+
public static Serde<JsonObjectValue> jsonObjectValueSerde() {
148+
return new JsonSerde<>(JsonObjectValue.class);
149+
}
150+
151+
@Override
152+
public String toString() {
153+
return "JsonObjectValue{" +
154+
"value='" + value + '\'' +
155+
'}';
156+
}
157+
}
158+
159+
@Configuration
160+
@EnableKafkaStreams
161+
public static class Config {
162+
163+
@Value("${" + KafkaEmbedded.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
164+
private String brokerAddresses;
165+
166+
@Bean
167+
public ProducerFactory<JsonObjectKey, JsonObjectValue> producerFactory() {
168+
return new DefaultKafkaProducerFactory<>(producerConfigs());
169+
}
170+
171+
@Bean
172+
public Map<String, Object> producerConfigs() {
173+
Map<String, Object> senderProps = KafkaTestUtils.senderProps(this.brokerAddresses);
174+
senderProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
175+
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
176+
return senderProps;
177+
}
178+
179+
@Bean
180+
public KafkaTemplate<?, ?> kafkaTemplate() {
181+
KafkaTemplate<JsonObjectKey, JsonObjectValue> kafkaTemplate = new KafkaTemplate<>(producerFactory());
182+
kafkaTemplate.setDefaultTopic(INPUT_TOPIC);
183+
return kafkaTemplate;
184+
}
185+
186+
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
187+
public StreamsConfig kStreamsConfigs() {
188+
Map<String, Object> props = new HashMap<>();
189+
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
190+
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
191+
return new StreamsConfig(props);
192+
}
193+
194+
@Bean
195+
@SuppressWarnings("unchecked")
196+
public KStream<JsonObjectKey, JsonObjectValue> testJsonSerializationStream(StreamsBuilder streamsBuilder) {
197+
KStream<JsonObjectKey, JsonObjectValue> testStream = streamsBuilder
198+
.stream(INPUT_TOPIC, Consumed.with(jsonObjectKeySerde(), jsonObjectValueSerde()));
199+
200+
testStream.print(Printed.toSysOut());
201+
testStream.to(OUTPUT_TOPIC, Produced.with(jsonObjectKeySerde(), jsonObjectValueSerde()));
202+
203+
return testStream;
204+
}
205+
}
206+
207+
}

0 commit comments

Comments
 (0)