-
Notifications
You must be signed in to change notification settings - Fork 1.7k
GH-2942: Kafka Streams queryable stores #3022
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Fixes: spring-projects#2942 This is an initial iteration for providing a basic API around interactive query service in Kafka Streams. In this iteration, we introduce a single API for retrieving the queryable state store from the Kafka Streams topology, namely, retrieveQueryableStore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checkstyle violation:
Error: eckstyle] [ERROR] /home/runner/work/spring-kafka/spring-kafka/spring-kafka/src/main/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryService.java:96:25: Reference to instance variable 'retryTemplate' needs "this.". [RequireThis]
Error: eckstyle] [ERROR] /home/runner/work/spring-kafka/spring-kafka/spring-kafka/src/main/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryService.java:98:25: Reference to instance variable 'retryTemplate' needs "this.". [RequireThis]
| * {@link KafkaStreams} under consideration. | ||
| * | ||
| * @author Soby Chacko | ||
| * @since 3.2.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just 3.2 for consistency with the rest of styles.
...fka/src/main/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryService.java
Show resolved
Hide resolved
| private KafkaStreams kafkaStreams; | ||
|
|
||
| /** | ||
| * Constructs an instance for querying state stores from the KafkaStreams in the {@link StreamsBuilderFactoryBean}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Method Javadocs must be imperative: https://github.com/spring-projects/spring-framework/wiki/Code-Style#javadoc-formatting
Plus no blank lines in the method Javadocs.
...fka/src/main/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryService.java
Show resolved
Hide resolved
| /** | ||
| * Underlying {@link KafkaStreams} from {@link StreamsBuilderFactoryBean}. | ||
| */ | ||
| private KafkaStreams kafkaStreams; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this has to be volatile since you initialize this property in lazy manner on demand.
| } | ||
| Assert.notNull(this.kafkaStreams, "KafkaStreams cannot be null"); | ||
| StoreQueryParameters<T> storeQueryParams = StoreQueryParameters.fromNameAndType(storeName, storeType); | ||
| AtomicReference<StoreQueryParameters<T>> storeQueryParametersReference = new AtomicReference<>(storeQueryParams); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure in this extra reference: that storeQueryParams is effectively already final.
...rc/test/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryServiceTests.java
Show resolved
Hide resolved
| QueryableStoreTypes.keyValueStore()); | ||
|
|
||
| assertThat(objectObjectReadOnlyKeyValueStore.get(123)).isNotNull(); | ||
| assertThat((Long) objectObjectReadOnlyKeyValueStore.get(123) >= 1L).isTrue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to call get() for the same value twice.
There is also isGreaterThanOrEqualTo() instead of that isTrue() for more precise reporting (if that).
| .retrieveQueryableStore(NON_EXISTENT_STORE, QueryableStoreTypes.keyValueStore()); | ||
| } | ||
| catch (Exception e) { | ||
| exc = e; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to use assertThatException() instead of try..catch: must better reporting.
...rc/test/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryServiceTests.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we are good so far.
We can turn this into a normal PR and go ahead with docs.
Thanks
...fka/src/main/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryService.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the whats-new.adoc has to mention this new feature as well.
Thanks
| if (this.kafkaStreams == null) { | ||
| this.kafkaStreams = this.streamsBuilderFactoryBean.getKafkaStreams(); | ||
| } | ||
| Assert.notNull(this.kafkaStreams, "KafkaStreams cannot be null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's improve this assert with something like ". Was provided StreamsBuilderFactoryBean been started?"!
|
|
||
| /** | ||
| * Retrieve and return a queryable store by name created in the application. | ||
| * |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No blank line.
...fka/src/main/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryService.java
Show resolved
Hide resolved
...rc/test/java/org/springframework/kafka/streams/KafkaStreamsInteractiveQueryServiceTests.java
Show resolved
Hide resolved
| A new API `KafkaStreamsInteractiveQuerySupport` for accessing queryable stores used in Kafka Streams interactive queries. | ||
| See xref:streams.adoc#kafka-streams-iq-support[Kafka Streams Interactive Support] for more details. | ||
|
|
||
| A new `TransactionIdSuffixStrategy` interface was introduced to manage `transactional.id` suffix. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this paragraph is a copy/paste artifact.
Will fix it shortly from GH GUI.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad. Sorry.
Fixes: #2942
This is an initial iteration for providing a basic API around interactive query service in Kafka Streams. In this iteration, we introduce a single API for retrieving the queryable state store from the Kafka Streams topology, namely, retrieveQueryableStore.