Skip to content

Commit

Permalink
add unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
sangreal committed May 4, 2024
1 parent 7288b3c commit ed0bcc6
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public class PartitionState<K, V> {
* storage
*/
@NonNull
@Setter(PACKAGE)
private ConcurrentSkipListMap<Long, Optional<ConsumerRecord<K, V>>> incompleteOffsets;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.micrometer.core.instrument.Gauge;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -69,7 +70,8 @@ public class ShardManager<K, V> {
*/
// performance: could disable/remove if using partition order - but probably not worth the added complexity in the code to handle an extra special case
@Getter(AccessLevel.PRIVATE)
private final Map<ShardKey, ProcessingShard<K, V>> processingShards = new ConcurrentHashMap<>();
@Setter(AccessLevel.PACKAGE)
private Map<ShardKey, ProcessingShard<K, V>> processingShards = new ConcurrentHashMap<>();

/**
* TreeSet is a Set, so must ensure that we are consistent with equalTo in our comparator - so include the full id -
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,18 @@
*/

import io.confluent.parallelconsumer.internal.PCModuleTestEnv;
import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import pl.tlinkowski.unij.api.UniLists;

import java.util.Map;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;

import static com.google.common.truth.Truth.assertThat;
import static pl.tlinkowski.unij.api.UniLists.of;
Expand All @@ -19,6 +28,42 @@
class ShardManagerTest {

ModelUtils mu = new ModelUtils();
PartitionState<String, String> state;
WorkManager<String, String> wm;

String topic = "myTopic";
int partition = 0;

TopicPartition tp = new TopicPartition(topic, partition);

ConcurrentSkipListMap<Long, Optional<ConsumerRecord<String, String>>> incompleteOffsets = new ConcurrentSkipListMap<>();

@BeforeEach
void setup() {
state = new PartitionState<>(0, mu.getModule(), tp, OffsetMapCodecManager.HighestOffsetAndIncompletes.of());
wm = mu.getModule().workManager();
wm.onPartitionsAssigned(UniLists.of(tp));
}

@Test
void testAssignedQuickRevokeNPE() {
// issue : https://github.com/confluentinc/parallel-consumer/issues/757
// 1. partition assigned and incompleteOffsets existed
// 2. right before begin to poll and process messages, it got revoked
// 3. the processingShard has no data yet
// 4. when revoked, try to delete entries with records from incompleteOffsets, no such record in entries
PCModuleTestEnv module = mu.getModule();
ShardManager<String, String> sm = new ShardManager<>(module, module.workManager());
ConsumerRecord<String, String> consumerRecord = new ConsumerRecord<>(topic, partition, 1, null, "test1");

Map<ShardKey, ProcessingShard<String, String>> processingShards = new ConcurrentHashMap<>();
processingShards.put(ShardKey.ofKey(consumerRecord), new ProcessingShard<>(ShardKey.ofKey(consumerRecord), module.options(), wm.getPm()));
sm.setProcessingShards(processingShards);
incompleteOffsets.put(1L, Optional.of(consumerRecord));
state.setIncompleteOffsets(incompleteOffsets);
state.onPartitionsRemoved(sm);
assertThat(sm.getShard(ShardKey.ofKey(consumerRecord))).isEmpty();
}

@Test
void retryQueueOrdering() {
Expand Down

0 comments on commit ed0bcc6

Please sign in to comment.