Skip to content

Commit

Permalink
add multiple caches for accelerating available container count calcul…
Browse files Browse the repository at this point in the history
…ation (#667)

Refactor of available work calculation eliminating shard scanning
  • Loading branch information
sangreal authored Feb 22, 2024
1 parent de38055 commit 20f8b27
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 41 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ NOTE:: Dependency version bumps are not listed here.
ifndef::github_name[]
toc::[]
endif::[]

== 0.5.2.9

=== Improvements
* improvement: add multiple caches for accelerating available container count calculation (#667)


== 0.5.2.8

=== Fixes
Expand Down
7 changes: 7 additions & 0 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1533,6 +1533,13 @@ NOTE:: Dependency version bumps are not listed here.
ifndef::github_name[]
toc::[]
endif::[]

== 0.5.2.9

=== Improvements
* improvement: add multiple caches for accelerating available container count calculation (#667)


== 0.5.2.8

=== Fixes
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.confluent.parallelconsumer.internal;

/*-
* Copyright (C) 2020-2023 Confluent, Inc.
* Copyright (C) 2020-2024 Confluent, Inc.
*/

import io.confluent.csid.utils.SupplierUtils;
Expand Down Expand Up @@ -1407,20 +1407,8 @@ public void requestCommitAsap() {
notifySomethingToDo();
}

private boolean isCommandedToCommit() {
synchronized (commitCommand) {
return this.commitCommand.get();
}
}

private void clearCommitCommand() {
synchronized (commitCommand) {
if (commitCommand.get()) {
log.debug("Command to commit asap received, clearing");
this.commitCommand.set(false);
}
}
}


private boolean isTransactionCommittingInProgress() {
return options.isUsingTransactionCommitMode() &&
Expand All @@ -1447,4 +1435,20 @@ public void resumeIfPaused() {
log.debug("Skipping transition of parallel consumer to state running. Current state is {}.", this.state);
}
}

private boolean isCommandedToCommit() {
synchronized (commitCommand) {
return this.commitCommand.get();
}
}

private void clearCommitCommand() {
synchronized (commitCommand) {
if (commitCommand.get()) {
log.debug("Command to commit asap received, clearing");
this.commitCommand.set(false);
}
}
}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.confluent.parallelconsumer.state;

/*-
* Copyright (C) 2020-2023 Confluent, Inc.
* Copyright (C) 2020-2024 Confluent, Inc.
*/

import io.confluent.parallelconsumer.ParallelConsumerOptions;
Expand All @@ -14,6 +14,7 @@
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static io.confluent.csid.utils.BackportUtils.toSeconds;
Expand Down Expand Up @@ -44,6 +45,7 @@ public class ProcessingShard<K, V> {
@Getter
private final NavigableMap<Long, WorkContainer<K, V>> entries = new ConcurrentSkipListMap<>();


@Getter(PRIVATE)
private final ShardKey key;

Expand All @@ -53,17 +55,15 @@ public class ProcessingShard<K, V> {

private final RateLimiter slowWarningRateLimit = new RateLimiter(5);

public boolean workIsWaitingToBeProcessed() {
return entries.values().parallelStream()
.anyMatch(kvWorkContainer -> kvWorkContainer.isAvailableToTakeAsWork());
}
private final AtomicLong availableWorkContainerCnt = new AtomicLong(0);

public void addWorkContainer(WorkContainer<K, V> wc) {
long key = wc.offset();
if (entries.containsKey(key)) {
log.debug("Entry for {} already exists in shard queue, dropping record", wc);
} else {
entries.put(key, wc);
availableWorkContainerCnt.incrementAndGet();
}
}

Expand All @@ -72,15 +72,18 @@ public void onSuccess(WorkContainer<?, ?> wc) {
entries.remove(wc.offset());
}

public void onFailure() {
// increase available cnt first to let retry expired calculated later
availableWorkContainerCnt.incrementAndGet();
}


public boolean isEmpty() {
return entries.isEmpty();
}

public long getCountOfWorkAwaitingSelection() {
return entries.values().stream()
// todo missing pm.isBlocked(topicPartition) ?
.filter(WorkContainer::isAvailableToTakeAsWork)
.count();
return availableWorkContainerCnt.get();
}

public long getCountOfWorkTracked() {
Expand All @@ -94,22 +97,35 @@ public long getCountWorkInFlight() {
}

public WorkContainer<K, V> remove(long offset) {
// from onPartitionsRemoved callback, need to deduce the available worker count for the revoked partition
WorkContainer<K, V> toRemovedWorker = entries.get(offset);
if (toRemovedWorker != null && toRemovedWorker.isAvailableToTakeAsWork()) {
dcrAvailableWorkContainerCntByDelta(1);
}
return entries.remove(offset);
}



// remove staled WorkContainer otherwise when the partition is reassigned, the staled messages will:
// 1. block the new work containers to be picked and processed
// 2. will cause the consumer to paused consuming new messages indefinitely
public boolean removeStaleWorkContainersFromShard() {
return this.entries.entrySet()
public List<WorkContainer<K, V>> removeStaleWorkContainersFromShard() {
List<WorkContainer<K, V>> staleContainers = new ArrayList<>();
this.entries.entrySet()
.removeIf(entry -> {
WorkContainer<K, V> workContainer = entry.getValue();
return isWorkContainerStale(workContainer);
boolean isStale = isWorkContainerStale(workContainer);
if (isStale) {
// decrease the AvailableWorkContainerCnt and collect stale containers
dcrAvailableWorkContainerCntByDelta(1);
staleContainers.add(workContainer);
}
return isStale;
});
return staleContainers;
}


ArrayList<WorkContainer<K, V>> getWorkIfAvailable(int workToGetDelta) {
log.trace("Looking for work on shardQueueEntry: {}", getKey());

Expand All @@ -123,6 +139,7 @@ ArrayList<WorkContainer<K, V>> getWorkIfAvailable(int workToGetDelta) {
if (pm.couldBeTakenAsWork(workContainer)) {
if (workContainer.isAvailableToTakeAsWork()) {
log.trace("Taking {} as work", workContainer);

workContainer.onQueueingForExecution();
workTaken.add(workContainer);
} else {
Expand Down Expand Up @@ -154,6 +171,8 @@ ArrayList<WorkContainer<K, V>> getWorkIfAvailable(int workToGetDelta) {

logSlowWork(slowWork);

dcrAvailableWorkContainerCntByDelta(workTaken.size());

return workTaken;
}

Expand Down Expand Up @@ -200,4 +219,12 @@ private boolean isOrderRestricted() {
private boolean isWorkContainerStale(WorkContainer<K, V> workContainer) {
return pm.getPartitionState(workContainer).checkIfWorkIsStale(workContainer);
}

private void dcrAvailableWorkContainerCntByDelta(int ByNum) {
availableWorkContainerCnt.getAndAdd(-1 * ByNum);
// in case of possible race condition
if (availableWorkContainerCnt.get() < 0L) {
availableWorkContainerCnt.set(0L);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.confluent.parallelconsumer.state;

/*-
* Copyright (C) 2020-2023 Confluent, Inc.
* Copyright (C) 2020-2024 Confluent, Inc.
*/

import io.confluent.csid.utils.LoopingResumingIterator;
Expand All @@ -22,7 +22,11 @@
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.KEY;
Expand Down Expand Up @@ -132,15 +136,16 @@ ShardKey computeShardKey(ConsumerRecord<?, ?> wc) {
* @return Work ready in the processing shards, awaiting selection as work to do
*/
public long getNumberOfWorkQueuedInShardsAwaitingSelection() {
// all available container count - (still pending for running retry containers count)
// => all_available_count - (retryCnt - all_expired_retry_cnt)

return processingShards.values().stream()
.mapToLong(ProcessingShard::getCountOfWorkAwaitingSelection)
.sum();
.sum() - retryQueue.size() + getNumberOfFailedWorkReadyToBeRetried();
}

public boolean workIsWaitingToBeProcessed() {
Collection<ProcessingShard<K, V>> allShards = processingShards.values();
return allShards.parallelStream()
.anyMatch(ProcessingShard::workIsWaitingToBeProcessed);
return getNumberOfWorkQueuedInShardsAwaitingSelection() > 0L;
}

/**
Expand Down Expand Up @@ -225,6 +230,14 @@ public void onSuccess(WorkContainer<?, ?> wc) {
public void onFailure(WorkContainer<?, ?> wc) {
log.debug("Work FAILED");
this.retryQueue.add(wc);

var key = computeShardKey(wc);
var shardOptional = getShard(key);

if (shardOptional.isPresent()) {
shardOptional.get().onFailure();
}

}

/**
Expand Down Expand Up @@ -273,14 +286,13 @@ public List<WorkContainer<K, V>> getWorkIfAvailable(final int requestedMaxWorkTo
return workFromAllShards;
}

// remove stale containers from both processingShards and retryQueue
public boolean removeStaleContainers() {
boolean removed = processingShards.values().stream()
return processingShards.values().stream()
.map(ProcessingShard::removeStaleWorkContainersFromShard)
.anyMatch(res -> res.equals(true));
if (removed) {
log.debug("there are stale work containers removed");
}
return removed;
.flatMap(Collection::stream)
.map(retryQueue::remove)
.findAny().isPresent();
}

private void updateResumePoint(Optional<Map.Entry<ShardKey, ProcessingShard<K, V>>> lastShard) {
Expand All @@ -298,4 +310,21 @@ private void initMetrics() {
numberOfShardsGauge = pcMetrics.gaugeFromMetricDef(PCMetricsDef.NUMBER_OF_SHARDS,
this, shardManager -> shardManager.processingShards.keySet().size());
}

// get expired items count from retryQueue
private long getNumberOfFailedWorkReadyToBeRetried() {
long count = 0;
for (WorkContainer<?, ?> workContainer : retryQueue) {
// when poller check, considering it is ready to be retried. there are two scenarios:
// 1. the container is yet to be selected, therefore it is not inflight and should be counted in
// 2. the container has been selected and it is inflight but we already slashed them from availableWorkContainerCnt, so should be counted in
if (workContainer.isDelayPassed()) {
count++;
} else {
// early stop since retryQueue is sorted by retryDueAt
break;
}
}
return count;
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.confluent.parallelconsumer.state;

/*-
* Copyright (C) 2020-2023 Confluent, Inc.
* Copyright (C) 2020-2024 Confluent, Inc.
*/

import io.confluent.parallelconsumer.PollContextInternal;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.confluent.parallelconsumer.state;

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
* Copyright (C) 2020-2024 Confluent, Inc.
*/

import io.confluent.parallelconsumer.internal.PCModuleTestEnv;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.confluent.parallelconsumer.state;

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
* Copyright (C) 2020-2024 Confluent, Inc.
*/

import com.google.common.truth.Truth;
Expand Down Expand Up @@ -131,6 +131,7 @@ private ConsumerRecord<String, String> makeRec(String value, String key, int par
return stringStringConsumerRecord;
}


@ParameterizedTest
@EnumSource
void basic(ParallelConsumerOptions.ProcessingOrder order) {
Expand Down Expand Up @@ -738,4 +739,41 @@ void starvation() {

}

/**
* Tests available worker cnt
*/
@Test
void testAvailableWorkerCnt() {
ParallelConsumerOptions<?, ?> build = ParallelConsumerOptions.builder().ordering(PARTITION).build();
setupWorkManager(build);
// sanity
assertThat(wm.getOptions().getOrdering()).isEqualTo(PARTITION);

registerSomeWork();

int total = 3;
int maxWorkToGet = 2;

var works = wm.getWorkIfAvailable(maxWorkToGet);

assertThat(wm.getNumberOfWorkQueuedInShardsAwaitingSelection()).isEqualTo(total - works.size());

// fail the work
var wc = works.get(0);
fail(wc);


// advance clock to make delay pass
advanceClockByDelay();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// work should now be ready to take
works = wm.getWorkIfAvailable(maxWorkToGet);
assertThat(wm.getNumberOfWorkQueuedInShardsAwaitingSelection()).isEqualTo(total - works.size());

}

}

0 comments on commit 20f8b27

Please sign in to comment.