Skip to content

Commit

Permalink
tests: Demonstrates how to use MockConsumer with PC for issue #176
Browse files Browse the repository at this point in the history
  • Loading branch information
astubbs committed Nov 3, 2022
1 parent f19083d commit eba78dd
Showing 1 changed file with 79 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package io.confluent.parallelconsumer;

import io.confluent.csid.utils.LongPollingMockConsumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;
import org.testcontainers.shaded.org.awaitility.Awaitility;

import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

import static com.google.common.truth.Truth.assertThat;
import static pl.tlinkowski.unij.api.UniLists.of;

/**
* Tests that PC works fine with the plain vanilla {@link MockConsumer}, as opposed to the
* {@link LongPollingMockConsumer}.
* <p>
* These tests demonstrate why using {@link MockConsumer} is difficult, and why {@link LongPollingMockConsumer} should
* be used instead.
*
* @author Antony Stubbs
* @see LongPollingMockConsumer#revokeAssignment
*/
@Slf4j
class MockConsumerTest {//} extends ParallelEoSStreamProcessorTestBase {

private final String topic = MockConsumerTest.class.getSimpleName();

/**
* Test that the mock consumer works as expected
*/
@Test
void mockConsumer() {
var mockConsumer = new MockConsumer<String, String>(OffsetResetStrategy.EARLIEST);
HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
TopicPartition tp = new TopicPartition(topic, 0);
startOffsets.put(tp, 0L);

//
var options = ParallelConsumerOptions.<String, String>builder()
.consumer(mockConsumer)
.build();
var parallelConsumer = new ParallelEoSStreamProcessor<String, String>(options);
parallelConsumer.subscribe(of(topic));

// MockConsumer is not a correct implementation of the Consumer contract - must manually rebalance++ - or use LongPollingMockConsumer
mockConsumer.rebalance(Collections.singletonList(tp));
parallelConsumer.onPartitionsAssigned(of(tp));
mockConsumer.updateBeginningOffsets(startOffsets);

//
addRecords(mockConsumer);

//
ConcurrentLinkedQueue<RecordContext<String, String>> records = new ConcurrentLinkedQueue<>();
parallelConsumer.poll(recordContexts -> {
recordContexts.forEach(recordContext -> {
log.warn("Processing: {}", recordContext);
records.add(recordContext);
});
});

//
Awaitility.await().untilAsserted(() -> {
assertThat(records).hasSize(3);
});
}

private void addRecords(MockConsumer<String, String> mockConsumer) {
mockConsumer.addRecord(new org.apache.kafka.clients.consumer.ConsumerRecord<>(topic, 0, 0, "key", "value"));
mockConsumer.addRecord(new org.apache.kafka.clients.consumer.ConsumerRecord<>(topic, 0, 1, "key", "value"));
mockConsumer.addRecord(new org.apache.kafka.clients.consumer.ConsumerRecord<>(topic, 0, 2, "key", "value"));
}

}

0 comments on commit eba78dd

Please sign in to comment.