Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements/DI module #398

Merged
merged 7 commits into from
Sep 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@

= Change Log

A high level summary of noteworthy changes in each version.

NOTE:: Dependency version bumps are not listed here.

// git log --pretty="* %s" 0.3.0.2..HEAD

// only show TOC if this is the root document (not in the README)
Expand All @@ -12,6 +16,18 @@ endif::[]

== Next Version

== v0.5.2.3

=== Improvements

- Adds a very simple Dependency Injection system modeled on Dagger (#398)

== v0.5.2.2

=== Fixes

- Fixes dependency scope for Mockito from compile to test (#376)

== v0.5.2.1

=== Fixes
Expand Down
16 changes: 16 additions & 0 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1167,6 +1167,10 @@ http://www.apache.org/[Apache®], http://kafka.apache.org/[Apache Kafka], and ht

= Change Log

A high level summary of noteworthy changes in each version.

NOTE:: Dependency version bumps are not listed here.

// git log --pretty="* %s" 0.3.0.2..HEAD

// only show TOC if this is the root document (not in the README)
Expand All @@ -1176,6 +1180,18 @@ endif::[]

== Next Version

== v0.5.2.3

=== Improvements

- Adds a very simple Dependency Injection system modeled on Dagger (#398)

== v0.5.2.2

=== Fixes

- Fixes dependency scope for Mockito from compile to test (#376)

== v0.5.2.1

=== Fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.confluent.csid.utils.TimeUtils;
import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.internal.InternalRuntimeError;
import io.confluent.parallelconsumer.internal.PCModule;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand All @@ -32,6 +33,10 @@ public class ParallelEoSStreamProcessor<K, V> extends AbstractParallelEoSStreamP
*
* @see ParallelConsumerOptions
*/
public ParallelEoSStreamProcessor(final ParallelConsumerOptions<K, V> newOptions, PCModule<K, V> module) {
super(newOptions, module);
}

public ParallelEoSStreamProcessor(final ParallelConsumerOptions<K, V> newOptions) {
super(newOptions);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,21 +220,27 @@ public Exception getFailureCause() {
*/
private boolean lastWorkRequestWasFulfilled = false;

protected AbstractParallelEoSStreamProcessor(ParallelConsumerOptions<K, V> newOptions) {
this(newOptions, new PCModule<>(newOptions));
}

/**
* Construct the AsyncConsumer by wrapping this passed in conusmer and producer, which can be configured any which
* way as per normal.
*
* @see ParallelConsumerOptions
*/
public AbstractParallelEoSStreamProcessor(ParallelConsumerOptions<K, V> newOptions) {
protected AbstractParallelEoSStreamProcessor(ParallelConsumerOptions<K, V> newOptions, PCModule<K, V> module) {
Objects.requireNonNull(newOptions, "Options must be supplied");

module.setParallelEoSStreamProcessor(this);

log.info("Confluent Parallel Consumer initialise... Options: {}", newOptions);

options = newOptions;
options.validate();

this.dynamicExtraLoadFactor = new DynamicLoadFactor();
this.dynamicExtraLoadFactor = module.dynamicExtraLoadFactor();
this.consumer = options.getConsumer();

checkGroupIdConfigured(consumer);
Expand All @@ -243,14 +249,12 @@ public AbstractParallelEoSStreamProcessor(ParallelConsumerOptions<K, V> newOptio

workerThreadPool = setupWorkerPool(newOptions.getMaxConcurrency());

this.wm = new WorkManager<K, V>(newOptions, consumer, dynamicExtraLoadFactor, TimeUtils.getClock());

ConsumerManager<K, V> consumerMgr = new ConsumerManager<>(consumer);
this.wm = module.workManager();

this.brokerPollSubsystem = new BrokerPollSystem<>(consumerMgr, wm, this, newOptions);
this.brokerPollSubsystem = module.brokerPoller(this);

if (options.isProducerSupplied()) {
this.producerManager = Optional.of(new ProducerManager<>(options.getProducer(), consumerMgr, this.wm, options));
this.producerManager = Optional.of(module.producerManager());
if (options.isUsingTransactionalProducer())
this.committer = this.producerManager.get();
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class ConsumerManager<K, V> {
private final AtomicBoolean pollingBroker = new AtomicBoolean(false);

/**
* Since Kakfa 2.7, multi threaded access to consumer group metadata was blocked, so before and after polling, save
* Since Kakfa 2.7, multi-threaded access to consumer group metadata was blocked, so before and after polling, save
* a copy of the metadata.
*
* @since 2.7.0
Expand All @@ -49,11 +49,11 @@ ConsumerRecords<K, V> poll(Duration requestedLongPollTimeout) {
commitRequested = false;
}
pollingBroker.set(true);
metaCache = consumer.groupMetadata();
updateMetadataCache();
log.debug("Poll starting with timeout: {}", timeoutToUse);
records = consumer.poll(timeoutToUse);
log.debug("Poll completed normally (after timeout of {}) and returned {}...", timeoutToUse, records.count());
metaCache = consumer.groupMetadata();
updateMetadataCache();
} catch (WakeupException w) {
correctPollWakeups++;
log.debug("Awoken from broker poll");
Expand All @@ -65,6 +65,10 @@ ConsumerRecords<K, V> poll(Duration requestedLongPollTimeout) {
return records;
}

protected void updateMetadataCache() {
metaCache = consumer.groupMetadata();
}

/**
* Wakes up the consumer, but only if it's polling.
* <p>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package io.confluent.parallelconsumer.internal;
rkolesnev marked this conversation as resolved.
Show resolved Hide resolved

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.csid.utils.TimeUtils;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.state.WorkManager;
import lombok.Setter;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;

/**
* Minimum dependency injection system, modled on how Dagger works.
* <p>
* Note: Not using Dagger as PC has a zero dependency policy, and franky it would be overkill for our needs.
*
* @author Antony Stubbs
*/
public class PCModule<K, V> {

protected ParallelConsumerOptions<K, V> optionsInstance;

@Setter
protected AbstractParallelEoSStreamProcessor<K, V> parallelEoSStreamProcessor;

public PCModule(ParallelConsumerOptions<K, V> options) {
this.optionsInstance = options;
}

public ParallelConsumerOptions<K, V> options() {
return optionsInstance;
}

private ProducerManager<K, V> producerManager;

protected ProducerManager<K, V> producerManager() {
if (producerManager == null) {
this.producerManager = new ProducerManager<>(producer(), consumerManager(), workManager(), options());
}
return producerManager;
}

public Producer<K, V> producer() {
return optionsInstance.getProducer();
astubbs marked this conversation as resolved.
Show resolved Hide resolved
}

public Consumer<K, V> consumer() {
return optionsInstance.getConsumer();
}

private ConsumerManager<K, V> consumerManager;

protected ConsumerManager<K, V> consumerManager() {
if (consumerManager == null) {
consumerManager = new ConsumerManager<>(optionsInstance.getConsumer());
}
return consumerManager;
}

private WorkManager<K, V> workManager;

public WorkManager<K, V> workManager() {
if (workManager == null) {
workManager = new WorkManager<>(this, dynamicExtraLoadFactor(), TimeUtils.getClock());
}
return workManager;
}

protected AbstractParallelEoSStreamProcessor<K, V> pc() {
if (parallelEoSStreamProcessor == null) {
parallelEoSStreamProcessor = new ParallelEoSStreamProcessor<>(options(), this);
}
return parallelEoSStreamProcessor;
}

final DynamicLoadFactor dynamicLoadFactor = new DynamicLoadFactor();

protected DynamicLoadFactor dynamicExtraLoadFactor() {
return dynamicLoadFactor;
}

private BrokerPollSystem<K, V> brokerPollSystem;

protected BrokerPollSystem<K, V> brokerPoller(AbstractParallelEoSStreamProcessor<K, V> pc) {
if (brokerPollSystem == null) {
brokerPollSystem = new BrokerPollSystem<>(consumerManager(), workManager(), pc, options());
}
return brokerPollSystem;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@

import io.confluent.csid.utils.TimeUtils;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.internal.BrokerPollSystem;
import io.confluent.parallelconsumer.internal.DynamicLoadFactor;
import io.confluent.parallelconsumer.internal.EpochAndRecordsMap;
import io.confluent.parallelconsumer.internal.*;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
Expand Down Expand Up @@ -71,23 +68,24 @@ public class WorkManager<K, V> implements ConsumerRebalanceListener {
@Getter(PUBLIC)
private final List<Consumer<WorkContainer<K, V>>> successfulWorkListeners = new ArrayList<>();

public WorkManager(ParallelConsumerOptions<K, V> options, org.apache.kafka.clients.consumer.Consumer<K, V> consumer) {
this(options, consumer, new DynamicLoadFactor(), TimeUtils.getClock());
public WorkManager(PCModule<K, V> module) {
this(module, new DynamicLoadFactor(), TimeUtils.getClock());
}

/**
* Use a private {@link DynamicLoadFactor}, useful for testing.
*/
public WorkManager(ParallelConsumerOptions<K, V> options, org.apache.kafka.clients.consumer.Consumer<K, V> consumer, Clock clock) {
this(options, consumer, new DynamicLoadFactor(), clock);
public WorkManager(PCModule<K, V> module, Clock clock) {
this(module, new DynamicLoadFactor(), clock);
}

public WorkManager(final ParallelConsumerOptions<K, V> newOptions, final org.apache.kafka.clients.consumer.Consumer<K, V> consumer,
final DynamicLoadFactor dynamicExtraLoadFactor, Clock clock) {
this.options = newOptions;
public WorkManager(final PCModule<K, V> module,
final DynamicLoadFactor dynamicExtraLoadFactor,
final Clock clock) {
this.options = module.options();
this.dynamicLoadFactor = dynamicExtraLoadFactor;
this.sm = new ShardManager<>(options, this, clock);
this.pm = new PartitionStateManager<>(consumer, sm, options, clock);
this.pm = new PartitionStateManager<>(module.consumer(), sm, options, clock);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public abstract class BrokerIntegrationTest<K, V> {

int numPartitions = 1;

@Getter
String topic;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
* @see OffsetSimultaneousEncoder#OffsetSimultaneousEncoder
*/
@Slf4j
public
class TransactionMarkersTest extends BrokerIntegrationTest<String, String> {

/**
Expand All @@ -51,7 +52,7 @@ class TransactionMarkersTest extends BrokerIntegrationTest<String, String> {
Producer<String, String> txProducerThree;
Producer<String, String> normalProducer;
Consumer<String, String> consumer;
ParallelEoSStreamProcessor<String, String> pc;
protected ParallelEoSStreamProcessor<String, String> pc;

@BeforeEach
// todo move to super?
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.confluent.parallelconsumer.internal;

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.producer.Producer;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
* Version of the {@link PCModule} in test contexts.
*
* @author Antony Stubbs
*/
public class PCModuleTestEnv extends PCModule<String, String> {

public PCModuleTestEnv(ParallelConsumerOptions<String, String> optionsInstance) {
super(optionsInstance);
Consumer mockConsumer = mock(Consumer.class);
when(mockConsumer.groupMetadata()).thenReturn(new ConsumerGroupMetadata(""));
var override = options().toBuilder()
.consumer(mockConsumer)
.producer(mock(Producer.class))
.build();
super.optionsInstance = override;
}

public PCModuleTestEnv() {
this(ParallelConsumerOptions.<String, String>builder().build());
}

@Override
protected ConsumerManager<String, String> consumerManager() {
ConsumerManager<String, String> consumerManager = super.consumerManager();

// force update to set cache, otherwise maybe never called (fake consuemr)
consumerManager.updateMetadataCache();

return consumerManager;
}
}
Loading