Skip to content

Commit f6bfd80

Browse files
committed
GH-2942: Kafka Streams queryable stores
Fixes: #2942 This is an initial iteration for providing a basic API around interactive query service in Kafka Streams. In this iteration, we introduce a single API for retrieving the queryable state store from the Kafka Streams topology, namely, retrieveQueryableStore.
1 parent addfaf0 commit f6bfd80

File tree

2 files changed

+352
-0
lines changed

2 files changed

+352
-0
lines changed
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Copyright 2024-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.streams;
18+
19+
import java.util.concurrent.atomic.AtomicReference;
20+
21+
import org.apache.kafka.streams.KafkaStreams;
22+
import org.apache.kafka.streams.StoreQueryParameters;
23+
import org.apache.kafka.streams.state.QueryableStoreType;
24+
25+
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
26+
import org.springframework.retry.RetryPolicy;
27+
import org.springframework.retry.backoff.FixedBackOffPolicy;
28+
import org.springframework.retry.policy.SimpleRetryPolicy;
29+
import org.springframework.retry.support.RetryTemplate;
30+
import org.springframework.util.Assert;
31+
32+
/**
33+
* Provide a wrapper API around the interactive query stores in Kafka Streams.
34+
* Using this API, an application can gain access to a named state store in the
35+
* {@link KafkaStreams} under consideration.
36+
*
37+
* @author Soby Chacko
38+
* @since 3.2.0
39+
*/
40+
public class KafkaStreamsInteractiveQueryService {
41+
42+
private static final int DEFAULT_MAX_ATTEMPTS = 1;
43+
/**
44+
* {@link StreamsBuilderFactoryBean} that provides {@link KafkaStreams} where the state store is retrieved from.
45+
*/
46+
private final StreamsBuilderFactoryBean streamsBuilderFactoryBean;
47+
48+
/**
49+
* {@link RetryTemplate} to be used by the interative query service.
50+
*/
51+
private RetryTemplate retryTemplate = new RetryTemplate();
52+
53+
/**
54+
* Underlying {@link KafkaStreams} from {@link StreamsBuilderFactoryBean}.
55+
*/
56+
private KafkaStreams kafkaStreams;
57+
58+
/**
59+
* Constructs an instance for querying state stores from the KafkaStreams in the {@link StreamsBuilderFactoryBean}.
60+
*
61+
* @param streamsBuilderFactoryBean {@link StreamsBuilderFactoryBean} for {@link KafkaStreams}.
62+
*/
63+
public KafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
64+
this.streamsBuilderFactoryBean = streamsBuilderFactoryBean;
65+
}
66+
67+
/**
68+
* Retrieve and return a queryable store by name created in the application.
69+
*
70+
* @param storeName name of the queryable store
71+
* @param storeType type of the queryable store
72+
* @param <T> generic type for the queryable store
73+
* @return queryable store.
74+
*/
75+
public <T> T retrieveQueryableStore(String storeName, QueryableStoreType<T> storeType) {
76+
if (this.kafkaStreams == null) {
77+
this.kafkaStreams = this.streamsBuilderFactoryBean.getKafkaStreams();
78+
}
79+
Assert.notNull(this.kafkaStreams, "KafkaStreams cannot be null");
80+
StoreQueryParameters<T> storeQueryParams = StoreQueryParameters.fromNameAndType(storeName, storeType);
81+
AtomicReference<StoreQueryParameters<T>> storeQueryParametersReference = new AtomicReference<>(storeQueryParams);
82+
83+
return getRetryTemplate().execute(context -> {
84+
try {
85+
return this.kafkaStreams.store(storeQueryParametersReference.get());
86+
}
87+
catch (Exception e) {
88+
throw new IllegalStateException("Error retrieving state store: " + storeName, e);
89+
}
90+
});
91+
}
92+
93+
private RetryTemplate getRetryTemplate() {
94+
if (this.retryTemplate == null) {
95+
this.retryTemplate = new RetryTemplate();
96+
retryTemplate.setBackOffPolicy(new FixedBackOffPolicy());
97+
RetryPolicy retryPolicy = new SimpleRetryPolicy(DEFAULT_MAX_ATTEMPTS);
98+
retryTemplate.setRetryPolicy(retryPolicy);
99+
}
100+
return this.retryTemplate;
101+
}
102+
103+
public void setRetryTemplate(RetryTemplate retryTemplate) {
104+
this.retryTemplate = retryTemplate;
105+
}
106+
}
Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
/*
2+
* Copyright 2024-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.streams;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.Mockito.spy;
22+
import static org.mockito.Mockito.times;
23+
import static org.mockito.Mockito.verify;
24+
25+
import java.lang.reflect.Field;
26+
import java.util.HashMap;
27+
import java.util.Map;
28+
import java.util.concurrent.CompletableFuture;
29+
import java.util.concurrent.TimeUnit;
30+
31+
import org.apache.kafka.clients.consumer.ConsumerRecord;
32+
import org.apache.kafka.common.serialization.Serdes;
33+
import org.apache.kafka.streams.KafkaStreams;
34+
import org.apache.kafka.streams.KeyValue;
35+
import org.apache.kafka.streams.StoreQueryParameters;
36+
import org.apache.kafka.streams.StreamsBuilder;
37+
import org.apache.kafka.streams.StreamsConfig;
38+
import org.apache.kafka.streams.kstream.Grouped;
39+
import org.apache.kafka.streams.kstream.KStream;
40+
import org.apache.kafka.streams.kstream.Materialized;
41+
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
42+
import org.apache.kafka.streams.state.QueryableStoreTypes;
43+
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
44+
import org.junit.jupiter.api.Test;
45+
46+
import org.springframework.beans.factory.annotation.Autowired;
47+
import org.springframework.beans.factory.annotation.Value;
48+
import org.springframework.context.annotation.Bean;
49+
import org.springframework.context.annotation.Configuration;
50+
import org.springframework.kafka.annotation.EnableKafka;
51+
import org.springframework.kafka.annotation.EnableKafkaStreams;
52+
import org.springframework.kafka.annotation.KafkaListener;
53+
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
54+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
55+
import org.springframework.kafka.config.KafkaListenerContainerFactory;
56+
import org.springframework.kafka.config.KafkaStreamsConfiguration;
57+
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
58+
import org.springframework.kafka.core.ConsumerFactory;
59+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
60+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
61+
import org.springframework.kafka.core.KafkaTemplate;
62+
import org.springframework.kafka.core.ProducerFactory;
63+
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
64+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
65+
import org.springframework.kafka.test.EmbeddedKafkaKraftBroker;
66+
import org.springframework.kafka.test.context.EmbeddedKafka;
67+
import org.springframework.kafka.test.utils.KafkaTestUtils;
68+
import org.springframework.retry.RetryPolicy;
69+
import org.springframework.retry.backoff.FixedBackOffPolicy;
70+
import org.springframework.retry.policy.SimpleRetryPolicy;
71+
import org.springframework.retry.support.RetryTemplate;
72+
import org.springframework.test.annotation.DirtiesContext;
73+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
74+
75+
/**
76+
* @author Soby Chacko
77+
* @since 3.2.0
78+
*/
79+
@SpringJUnitConfig
80+
@DirtiesContext
81+
@EmbeddedKafka(partitions = 1,
82+
topics = { "iqs-test-in", "iqs-test-out" })
83+
class KafkaStreamsInteractiveQueryServiceTests {
84+
85+
public static final String IQS_TEST_IN = "iqs-test-in";
86+
public static final String IQS_TEST_OUT = "iqs-test-out";
87+
public static final String STATE_STORE = "my-state-store";
88+
public static final String NON_EXISTENT_STORE = "my-non-existent-store";
89+
90+
@Autowired
91+
private EmbeddedKafkaKraftBroker embeddedKafka;
92+
93+
@Autowired
94+
private StreamsBuilderFactoryBean streamsBuilderFactoryBean;
95+
96+
@Autowired
97+
private KafkaTemplate<Integer, String> kafkaTemplate;
98+
99+
@Autowired
100+
private KafkaStreamsInteractiveQueryService interactiveQueryService;
101+
102+
@Autowired
103+
private CompletableFuture<ConsumerRecord<?, String>> resultFuture;
104+
105+
@Test
106+
void retrieveQueryableStore() throws Exception {
107+
this.kafkaTemplate.sendDefault(123, "123");
108+
this.kafkaTemplate.flush();
109+
110+
ConsumerRecord<?, String> result = resultFuture.get(600, TimeUnit.SECONDS);
111+
assertThat(result).isNotNull();
112+
113+
final ReadOnlyKeyValueStore<Object, Object> objectObjectReadOnlyKeyValueStore = this.interactiveQueryService
114+
.retrieveQueryableStore(STATE_STORE,
115+
QueryableStoreTypes.keyValueStore());
116+
117+
assertThat(objectObjectReadOnlyKeyValueStore.get(123)).isNotNull();
118+
assertThat((Long) objectObjectReadOnlyKeyValueStore.get(123) >= 1L).isTrue();
119+
}
120+
121+
@SuppressWarnings("unchecked")
122+
@Test
123+
void retrieveNonExistentStateStoreAndVerifyRetries() throws Exception {
124+
this.kafkaTemplate.sendDefault(123, "123");
125+
this.kafkaTemplate.flush();
126+
127+
ConsumerRecord<?, String> result = resultFuture.get(600, TimeUnit.SECONDS);
128+
assertThat(result).isNotNull();
129+
130+
assertThat(this.streamsBuilderFactoryBean.getKafkaStreams()).isNotNull();
131+
final KafkaStreams kafkaStreams = spy(this.streamsBuilderFactoryBean.getKafkaStreams());
132+
assertThat(kafkaStreams).isNotNull();
133+
134+
Field kafkaStreamsField = KafkaStreamsInteractiveQueryService.class.getDeclaredField("kafkaStreams");
135+
kafkaStreamsField.setAccessible(true);
136+
kafkaStreamsField.set(interactiveQueryService, kafkaStreams);
137+
138+
Throwable exc = null;
139+
try {
140+
this.interactiveQueryService
141+
.retrieveQueryableStore(NON_EXISTENT_STORE, QueryableStoreTypes.keyValueStore());
142+
}
143+
catch (Exception e) {
144+
exc = e;
145+
}
146+
assertThat(exc).isNotNull();
147+
assertThat(exc.getMessage()).contains("Error retrieving state store: my-non-existent-store");
148+
149+
verify(kafkaStreams, times(3)).store(any(StoreQueryParameters.class));
150+
}
151+
152+
@Configuration
153+
@EnableKafka
154+
@EnableKafkaStreams
155+
public static class KafkaStreamsConfig {
156+
157+
@Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
158+
private String brokerAddresses;
159+
160+
@Bean
161+
public ProducerFactory<Integer, String> producerFactory() {
162+
return new DefaultKafkaProducerFactory<>(producerConfigs());
163+
}
164+
165+
@Bean
166+
public Map<String, Object> producerConfigs() {
167+
return KafkaTestUtils.producerProps(this.brokerAddresses);
168+
}
169+
170+
@Bean
171+
public KafkaTemplate<Integer, String> template() {
172+
KafkaTemplate<Integer, String> kafkaTemplate = new KafkaTemplate<>(producerFactory(), true);
173+
kafkaTemplate.setDefaultTopic("iqs-test-in");
174+
return kafkaTemplate;
175+
}
176+
177+
@Bean
178+
public Map<String, Object> consumerConfigs() {
179+
return KafkaTestUtils.consumerProps(this.brokerAddresses, "testGroup",
180+
"false");
181+
}
182+
183+
@Bean
184+
public ConsumerFactory<Integer, String> consumerFactory() {
185+
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
186+
}
187+
188+
@Bean
189+
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
190+
kafkaListenerContainerFactory() {
191+
192+
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
193+
new ConcurrentKafkaListenerContainerFactory<>();
194+
factory.setConsumerFactory(consumerFactory());
195+
return factory;
196+
}
197+
198+
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
199+
public KafkaStreamsConfiguration kStreamsConfigs() {
200+
Map<String, Object> props = new HashMap<>();
201+
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "iqs-testStreams");
202+
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
203+
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
204+
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
205+
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
206+
WallclockTimestampExtractor.class.getName());
207+
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100");
208+
return new KafkaStreamsConfiguration(props);
209+
}
210+
211+
@Bean
212+
public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
213+
KStream<Integer, String> input = kStreamBuilder.stream(IQS_TEST_IN);
214+
final KStream<Integer, String> outbound = input.filter((key, value) -> key == 123)
215+
.map((key, value) -> new KeyValue<>(123, value))
216+
.groupByKey(Grouped.with(new Serdes.IntegerSerde(),
217+
new Serdes.StringSerde()))
218+
.count(Materialized.as(STATE_STORE)).toStream()
219+
.map((key, value) -> new KeyValue<>(key, "Count for ID 123: " + value));
220+
outbound.to(IQS_TEST_OUT);
221+
return outbound;
222+
}
223+
224+
@Bean
225+
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
226+
final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
227+
new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
228+
RetryTemplate retryTemplate = new RetryTemplate();
229+
retryTemplate.setBackOffPolicy(new FixedBackOffPolicy());
230+
RetryPolicy retryPolicy = new SimpleRetryPolicy(3);
231+
retryTemplate.setRetryPolicy(retryPolicy);
232+
kafkaStreamsInteractiveQueryService.setRetryTemplate(retryTemplate);
233+
return kafkaStreamsInteractiveQueryService;
234+
}
235+
236+
@Bean
237+
public CompletableFuture<ConsumerRecord<?, String>> resultFuture() {
238+
return new CompletableFuture<>();
239+
}
240+
241+
@KafkaListener(topics = "iqs-test-out")
242+
public void listener(ConsumerRecord<?, String> payload) {
243+
resultFuture().complete(payload);
244+
}
245+
}
246+
}

0 commit comments

Comments
 (0)