Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Return cached pausedPartitionSet #620

Merged
merged 4 commits into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ ifndef::github_name[]
toc::[]
endif::[]

== 0.5.2.7

=== Fixes

* fix: Return cached pausedPartitionSet (#618)

== 0.5.2.6
=== Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public BrokerPollSystem(ConsumerManager<K, V> consumerMgr, WorkManager<K, V> wm,
private void initMetrics() {
statusGauge = PCMetrics.getInstance().gaugeFromMetricDef(PCMetricsDef.PC_POLLER_STATUS, this, poller -> poller.runState.getValue());
numPausedPartitionsGauge = PCMetrics.getInstance().gaugeFromMetricDef(PCMetricsDef.NUM_PAUSED_PARTITIONS,
this.consumerManager, consumerManager -> consumerManager.paused().size());
this.consumerManager, ConsumerManager::getPausedPartitionSize);
}

public void start(String managedExecutorService) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package io.confluent.parallelconsumer.internal;

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
* Copyright (C) 2020-2023 Confluent, Inc.
*/

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.*;
Expand All @@ -26,6 +27,7 @@ public class ConsumerManager<K, V> {

private final AtomicBoolean pollingBroker = new AtomicBoolean(false);


/**
* Since Kakfa 2.7, multi-threaded access to consumer group metadata was blocked, so before and after polling, save
* a copy of the metadata.
Expand All @@ -34,6 +36,8 @@ public class ConsumerManager<K, V> {
*/
private ConsumerGroupMetadata metaCache;

private volatile int pausedPartitionSizeCache = 0;

private int erroneousWakups = 0;
private int correctPollWakeups = 0;
private int noWakeups = 0;
Expand All @@ -49,11 +53,11 @@ ConsumerRecords<K, V> poll(Duration requestedLongPollTimeout) {
commitRequested = false;
}
pollingBroker.set(true);
updateMetadataCache();
updateCache();
log.debug("Poll starting with timeout: {}", timeoutToUse);
records = consumer.poll(timeoutToUse);
log.debug("Poll completed normally (after timeout of {}) and returned {}...", timeoutToUse, records.count());
updateMetadataCache();
updateCache();
} catch (WakeupException w) {
correctPollWakeups++;
log.debug("Awoken from broker poll");
Expand All @@ -65,8 +69,9 @@ ConsumerRecords<K, V> poll(Duration requestedLongPollTimeout) {
return records;
}

protected void updateMetadataCache() {
protected void updateCache() {
metaCache = consumer.groupMetadata();
pausedPartitionSizeCache = consumer.paused().size();
}

/**
Expand Down Expand Up @@ -133,6 +138,10 @@ public Set<TopicPartition> paused() {
return consumer.paused();
}

public int getPausedPartitionSize() {
return pausedPartitionSizeCache;
}

public void resume(final Set<TopicPartition> pausedTopics) {
consumer.resume(pausedTopics);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ protected ConsumerManager<String, String> consumerManager() {
ConsumerManager<String, String> consumerManager = super.consumerManager();

// force update to set cache, otherwise maybe never called (fake consumer)
consumerManager.updateMetadataCache();
consumerManager.updateCache();

return consumerManager;
}
Expand Down