Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Missing the ConsumerSeekAware interface #812

Merged
merged 41 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
0bda9c8
Implement kafka seek mechanisms
guillermocalvo Aug 14, 2023
4bf5a5d
Add tests
guillermocalvo Aug 14, 2023
052d21f
Replace deprecated `KafkaConsumerAware` -> `ConsumerAware`
guillermocalvo Aug 15, 2023
388a31b
Fix typo `producer` -> `consumer`
guillermocalvo Aug 15, 2023
be32a75
Replace existing example `[source,java]` -> `snippet`
guillermocalvo Aug 15, 2023
99359cd
Add documentation for `ConsumerSeekAware`
guillermocalvo Aug 15, 2023
12debce
Add documentation for `KafkaSeekOperations`
guillermocalvo Aug 15, 2023
d915419
Add `@NonNull` annotations
guillermocalvo Aug 17, 2023
5b3e36d
Turn into a one-liner
guillermocalvo Aug 17, 2023
3b8a0a5
Fix log message
guillermocalvo Aug 17, 2023
c189fe2
Update javadoc
guillermocalvo Aug 17, 2023
54945a6
Use a record instead of a Java class
guillermocalvo Aug 17, 2023
a573357
Use a Kotlin data class instead of a regular class
guillermocalvo Aug 17, 2023
e1e12d1
Use `void` instead of `def`
guillermocalvo Aug 17, 2023
f2341a1
Split section into multiple documents
guillermocalvo Aug 17, 2023
9968baf
Make the Groovy snippet more idiomatic
guillermocalvo Aug 17, 2023
4faab8e
Add tests for the code samples in all test suites
guillermocalvo Aug 17, 2023
1dfc72f
Update tests
guillermocalvo Aug 17, 2023
852e9f6
Merge remote-tracking branch 'remotes/origin/master' into 46-missing-…
guillermocalvo Aug 17, 2023
a0fc4d0
rename back to old name
sdelamo Aug 18, 2023
2b97558
Split Kafka seek in sections
sdelamo Aug 18, 2023
1d517f6
Merge branch 'master' into 46-missing-the-consumerseekaware-interface
sdelamo Aug 18, 2023
b6abd6c
Apply suggestions from code review
guillermocalvo Aug 18, 2023
99bf01a
Use a simple `if`
guillermocalvo Aug 18, 2023
c243281
Refactor `DefaultKafkaSeeker::perform`
guillermocalvo Aug 18, 2023
944d29a
Refactor trait interface into static methods
guillermocalvo Aug 18, 2023
15b9ba3
Refactor cognitive complexity `DefaultKafkaSeeker::perform`
guillermocalvo Aug 18, 2023
c7cad51
Merge branch 'master' into 46-missing-the-consumerseekaware-interface
sdelamo Aug 18, 2023
9cbba67
remove MY_KAFKA stop
sdelamo Aug 18, 2023
052cee0
extract methods
sdelamo Aug 18, 2023
56a048a
Make checkstyle happy again.
guillermocalvo Aug 18, 2023
4785378
Revert "remove MY_KAFKA stop"
guillermocalvo Aug 18, 2023
6c7c396
Merge branch 'master' into 46-missing-the-consumerseekaware-interface
sdelamo Aug 18, 2023
0aba52f
remove abstractKafkatest
sdelamo Aug 18, 2023
9d5bcb3
Merge remote-tracking branch 'remotes/origin/master' into 46-missing-…
guillermocalvo Aug 21, 2023
040d4e9
Keep tests that aren't based on test-resources as they are
guillermocalvo Aug 21, 2023
5460b78
Refactor multi-language tests
guillermocalvo Aug 21, 2023
71632bb
Remove `@Inject` annotations
guillermocalvo Aug 22, 2023
5d50906
Log messages as `debug` instead of `info`
guillermocalvo Aug 22, 2023
6647ef5
Use nullable `Boolean` instead of `Optional`
guillermocalvo Aug 22, 2023
b4044cf
Revert "Use nullable `Boolean` instead of `Optional`"
guillermocalvo Aug 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2017-2020 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.kafka;

import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.seek.KafkaSeekOperation;
import io.micronaut.configuration.kafka.seek.KafkaSeeker;
import io.micronaut.core.annotation.NonNull;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.Collection;

/**
* Interface for {@link KafkaListener} instances to implement if they wish to perform
* {@link KafkaSeekOperation seek operations} when the set of partitions assigned to the
* {@link Consumer} changes.
*
* <p>This callback interface is based on {@link ConsumerRebalanceListener} and it provides a
* {@link KafkaSeeker} object that can perform {@link KafkaSeekOperation} instances immediately.</p>
*
* @author Guillermo Calvo
* @see ConsumerRebalanceListener
* @see KafkaSeekOperation
* @see KafkaSeeker
* @since 4.1
*/
@FunctionalInterface
public interface ConsumerSeekAware {
guillermocalvo marked this conversation as resolved.
Show resolved Hide resolved

/**
* A callback method the user can implement to provide handling of customized offsets
* on completion of a successful partition re-assignment.
*
* <p>This method will be called after the partition re-assignment completes and before the
* consumer starts fetching data, and only as the result of a
* {@link Consumer#poll(Duration) poll(long)} call.
* Under normal conditions, {@link #onPartitionsRevoked(Collection)} will be executed before
* {@link #onPartitionsAssigned(Collection, KafkaSeeker)}.</p>
*
* <p>The provided {@link KafkaSeeker} object can perform
* {@link KafkaSeekOperation seek operations} on the underlying consumer.</p>
*
* @param partitions The list of partitions that are now assigned to the consumer
* @param seeker The object that can perform {@link KafkaSeekOperation seek operations}
* @see ConsumerRebalanceListener#onPartitionsAssigned(Collection)
*/
void onPartitionsAssigned(@NonNull Collection<TopicPartition> partitions, @NonNull KafkaSeeker seeker);

/**
* @see ConsumerRebalanceListener#onPartitionsRevoked(Collection)
* @param partitions The list of partitions that were assigned to the consumer and now need to
* be revoked (may not include all currently assigned partitions, i.e. there
* may still be some partitions left)
*/
default void onPartitionsRevoked(@NonNull Collection<TopicPartition> partitions) { }

/**
* @see ConsumerRebalanceListener#onPartitionsLost(Collection)
* @param partitions The list of partitions that are now assigned to the consumer (previously
* owned partitions will NOT be included, i.e. this list will only include
* newly added partitions)
*/
default void onPartitionsLost(@NonNull Collection<TopicPartition> partitions) {
onPartitionsRevoked(partitions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.micronaut.configuration.kafka.ConsumerAware;
import io.micronaut.configuration.kafka.ConsumerRegistry;
import io.micronaut.configuration.kafka.ConsumerSeekAware;
import io.micronaut.configuration.kafka.KafkaAcknowledgement;
import io.micronaut.configuration.kafka.KafkaMessage;
import io.micronaut.configuration.kafka.ProducerRegistry;
Expand All @@ -37,6 +38,8 @@
import io.micronaut.configuration.kafka.event.KafkaConsumerSubscribedEvent;
import io.micronaut.configuration.kafka.exceptions.KafkaListenerException;
import io.micronaut.configuration.kafka.exceptions.KafkaListenerExceptionHandler;
import io.micronaut.configuration.kafka.seek.KafkaSeekOperations;
import io.micronaut.configuration.kafka.seek.KafkaSeeker;
import io.micronaut.configuration.kafka.serde.SerdeRegistry;
import io.micronaut.context.BeanContext;
import io.micronaut.context.annotation.Requires;
Expand Down Expand Up @@ -462,6 +465,9 @@ private void createConsumerThreadPollLoop(final ExecutableMethod<?, ?> method,
final Optional<Argument<?>> consumerArg = Arrays.stream(method.getArguments())
.filter(arg -> Consumer.class.isAssignableFrom(arg.getType()))
.findFirst();
final Optional<Argument<?>> seekArg = Arrays.stream(method.getArguments())
.filter(arg -> KafkaSeekOperations.class.isAssignableFrom(arg.getType()))
.findFirst();
final Optional<Argument<?>> ackArg = Arrays.stream(method.getArguments())
.filter(arg -> Acknowledgement.class.isAssignableFrom(arg.getType()))
.findFirst();
Expand Down Expand Up @@ -525,7 +531,7 @@ private void createConsumerThreadPollLoop(final ExecutableMethod<?, ?> method,
if (isBatch) {
failed = !processConsumerRecordsAsBatch(consumerState, method, boundArguments, consumerRecords);
} else {
failed = !processConsumerRecords(consumerState, method, boundArguments, trackPartitions, ackArg, consumerRecords);
failed = !processConsumerRecords(consumerState, method, boundArguments, trackPartitions, seekArg, ackArg, consumerRecords);
}
if (!failed) {
if (consumerState.offsetStrategy == OffsetStrategy.SYNC) {
Expand Down Expand Up @@ -561,6 +567,7 @@ private boolean processConsumerRecords(final ConsumerState consumerState,
final ExecutableMethod<?, ?> method,
final Map<Argument<?>, Object> boundArguments,
final boolean trackPartitions,
final Optional<Argument<?>> seekArg,
final Optional<Argument<?>> ackArg,
final ConsumerRecords<?, ?> consumerRecords) {
final ExecutableBinder<ConsumerRecord<?, ?>> executableBinder = new DefaultExecutableBinder<>(boundArguments);
Expand All @@ -581,6 +588,8 @@ private boolean processConsumerRecords(final ConsumerState consumerState,
}

Consumer<?, ?> kafkaConsumer = consumerState.kafkaConsumer;
final KafkaSeekOperations seek = seekArg.map(x -> KafkaSeekOperations.newInstance()).orElse(null);
seekArg.ifPresent(argument -> boundArguments.put(argument, seek));
ackArg.ifPresent(argument -> boundArguments.put(argument, (KafkaAcknowledgement) () -> kafkaConsumer.commitSync(currentOffsets)));

try {
Expand Down Expand Up @@ -614,6 +623,11 @@ private boolean processConsumerRecords(final ConsumerState consumerState,
} else if (consumerState.offsetStrategy == OffsetStrategy.ASYNC_PER_RECORD) {
kafkaConsumer.commitAsync(currentOffsets, resolveCommitCallback(consumerState.consumerBean));
}
if (seek != null) {
// Performs seek operations that were deferred by the user
final KafkaSeeker seeker = KafkaSeeker.newInstance(kafkaConsumer);
seek.forEach(seeker::perform);
}
}
return true;
}
Expand Down Expand Up @@ -760,7 +774,9 @@ private static void setupConsumerSubscription(final ExecutableMethod<?, ?> metho

if (hasTopics) {
final List<String> topics = Arrays.asList(topicNames);
if (consumerBean instanceof ConsumerRebalanceListener crl) {
if (consumerBean instanceof ConsumerSeekAware csa) {
kafkaConsumer.subscribe(topics, new ConsumerSeekAwareAdapter(KafkaSeeker.newInstance(kafkaConsumer), csa));
} else if (consumerBean instanceof ConsumerRebalanceListener crl) {
kafkaConsumer.subscribe(topics, crl);
} else {
kafkaConsumer.subscribe(topics);
Expand All @@ -779,7 +795,9 @@ private static void setupConsumerSubscription(final ExecutableMethod<?, ?> metho
} catch (Exception e) {
throw new MessagingSystemException("Invalid topic pattern [" + pattern + "] for method [" + method + "]: " + e.getMessage(), e);
}
if (consumerBean instanceof ConsumerRebalanceListener crl) {
if (consumerBean instanceof ConsumerSeekAware csa) {
kafkaConsumer.subscribe(compiledPattern, new ConsumerSeekAwareAdapter(KafkaSeeker.newInstance(kafkaConsumer), csa));
} else if (consumerBean instanceof ConsumerRebalanceListener crl) {
kafkaConsumer.subscribe(compiledPattern, crl);
} else {
kafkaConsumer.subscribe(compiledPattern);
Expand Down Expand Up @@ -1258,4 +1276,17 @@ private enum ConsumerCloseState {
NOT_STARTED, POLLING, CLOSED
}

private record ConsumerSeekAwareAdapter(@NonNull KafkaSeeker seeker, @NonNull ConsumerSeekAware bean)
implements ConsumerRebalanceListener {

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
bean.onPartitionsRevoked(partitions != null ? partitions : Collections.emptyList());
}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
bean.onPartitionsAssigned(partitions != null ? partitions : Collections.emptyList(), seeker);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2017-2020 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.kafka.seek;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.NonNull;
import org.apache.kafka.common.TopicPartition;

import java.util.Objects;

/**
* Default implementation of {@link KafkaSeekOperation}.
*
* @param topicPartition the topic name and partition number on which the seek should be performed.
* @param offsetType the offset type
* @param offset the offset that should be used to perform the seek. Must be positive
* @author Guillermo Calvo
* @see KafkaSeekOperation
* @since 4.1
*/
@Internal
record DefaultKafkaSeekOperation(
@NonNull TopicPartition topicPartition, @NonNull OffsetType offsetType, long offset
) implements KafkaSeekOperation {

/**
* Creates a new instance.
*
* @param topicPartition the topic name and partition number on which the seek should be performed
* @param offsetType the offset type
* @param offset the offset that should be used to perform the seek. Must be positive
*/
public DefaultKafkaSeekOperation {
Objects.requireNonNull(topicPartition, "topicPartition");
Objects.requireNonNull(topicPartition.topic(), "topicPartition.topic");
Objects.requireNonNull(offsetType, "offsetType");
if (offset < 0) {
throw new IllegalArgumentException("Negative offset");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2017-2020 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.kafka.seek;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.NonNull;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;

/**
* Default implementation of {@link KafkaSeekOperations}.
*
* @author Guillermo Calvo
* @see KafkaSeekOperations
* @since 4.1
*/
@Internal
final class DefaultKafkaSeekOperations implements KafkaSeekOperations {

private final List<KafkaSeekOperation> operations = new ArrayList<>();

@Override
@NonNull
public Iterator<KafkaSeekOperation> iterator() {
return operations.iterator();
}

@Override
public void defer(@NonNull KafkaSeekOperation operation) {
operations.add(Objects.requireNonNull(operation, "operation"));
}
}
Loading