Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add multiple caches for accelerating available container count calculation #667

Merged
merged 69 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from 64 commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
6dd8b95
enable retry queue and let retry records in desired order
sangreal Sep 8, 2023
22fd161
update license header
sangreal Sep 8, 2023
1e388fc
Merge remote-tracking branch 'upstream/master' into fix/enable-retry-…
sangreal Sep 8, 2023
a38a022
change to poll first record
sangreal Sep 8, 2023
3dd0b0d
add null check
sangreal Sep 9, 2023
1176e74
add retry handler and working thread to handle adding retry to mailbox
sangreal Sep 9, 2023
615d43f
add license header
sangreal Sep 9, 2023
f0afad5
remove unused code
sangreal Sep 9, 2023
6e4b457
some improv
sangreal Sep 10, 2023
24b8d1c
implement new map for available containers
sangreal Sep 11, 2023
8b6c307
change to calc count
sangreal Sep 14, 2023
c1decfc
remove unused code
sangreal Sep 14, 2023
e4cf445
fix worker calculation and add null protection
sangreal Sep 14, 2023
6d41d7e
remove unused code
sangreal Sep 15, 2023
4f72eff
remove unused code
sangreal Sep 15, 2023
b5c285f
tmp ignore
sangreal Sep 15, 2023
5631dbb
tmp ignore
sangreal Sep 15, 2023
53a5a5b
fix pcmetrics test
sangreal Sep 16, 2023
6bd409c
1. more refactor 2. add more comments
sangreal Sep 16, 2023
75f9399
fix retryhandler loop
sangreal Sep 24, 2023
3b91558
fix header
sangreal Sep 24, 2023
4d6e51a
fix comment in ut
sangreal Sep 24, 2023
852ea7d
update comment
sangreal Sep 24, 2023
7d6d110
add dueMillis to avoid io / lock
sangreal Sep 26, 2023
85067b6
Merge remote-tracking branch 'upstream/master' into fix/enable-retry-…
sangreal Oct 11, 2023
a01e56f
add more comments
sangreal Oct 11, 2023
b7f6dc9
update changelog
sangreal Oct 11, 2023
da26151
add retryItemCnt
sangreal Oct 11, 2023
7f2dd42
check dupe remove
sangreal Oct 11, 2023
297a597
address comments
sangreal Oct 11, 2023
224f4ec
remove retryHandler reference
sangreal Oct 20, 2023
34f9abc
fix conflict
sangreal Nov 19, 2023
850e496
Merge branch 'master' into fix/enable-retry-queue
eddyv Nov 20, 2023
2ce5a64
refine comments
sangreal Jan 20, 2024
319d9da
remove thread pool
sangreal Jan 20, 2024
b7128da
1. remove retryhandler 2. add expire count 3. fix test
sangreal Jan 21, 2024
253a23e
update changelog
sangreal Jan 21, 2024
719dc89
remove unused import
sangreal Jan 21, 2024
f0cf8d9
Merge branch 'master' into fix/enable-retry-queue
rkolesnev Jan 23, 2024
06b4186
update license header
sangreal Feb 8, 2024
36ea7cb
use https for repositories (#683)
rkolesnev Jan 23, 2024
0c23265
Make PC message buffer size configurable (#682)
rkolesnev Jan 24, 2024
dc2dfe9
build(deps): bump io.projectreactor:reactor-core from 3.5.9 to 3.6.2 …
dependabot[bot] Jan 25, 2024
6a52162
build(deps): bump org.apache.maven.plugins:maven-dependency-plugin (#…
dependabot[bot] Jan 25, 2024
af36c04
build(deps): bump io.micrometer:micrometer-registry-prometheus (#689)
dependabot[bot] Jan 25, 2024
fb1ba48
build(deps-dev): bump flogger.version from 0.7.4 to 0.8 (#688)
dependabot[bot] Jan 25, 2024
ca63d8d
build(deps): bump vertx.version from 4.4.4 to 4.5.1 (#687)
dependabot[bot] Jan 25, 2024
65b671c
license header updates
rkolesnev Jan 25, 2024
9f76240
build(deps): bump org.apache.maven.plugins:maven-site-plugin (#696)
dependabot[bot] Jan 25, 2024
dbf4602
build(deps): bump org.apache.maven.plugins:maven-clean-plugin (#695)
dependabot[bot] Jan 25, 2024
2b1dd6e
build(deps): bump org.apache.maven.plugins:maven-enforcer-plugin (#694)
dependabot[bot] Jan 25, 2024
e182325
build(deps-dev): bump org.postgresql:postgresql from 42.6.0 to 42.7.1…
dependabot[bot] Jan 25, 2024
d5d0669
build(deps): bump truth.version from 1.1.5 to 1.3.0 (#692)
dependabot[bot] Jan 25, 2024
04ab8b0
build(deps): bump junit.version from 5.10.0-RC1 to 5.10.1 (#701)
dependabot[bot] Jan 25, 2024
d61cb54
build(deps): bump com.github.tomakehurst:wiremock-jre8 (#700)
dependabot[bot] Jan 25, 2024
8c16688
Deps/update deps (#704)
rkolesnev Jan 25, 2024
1efd83d
Deps/update deps 2 (#710)
rkolesnev Jan 25, 2024
4482f28
update licenses in POM files
rkolesnev Jan 25, 2024
da4db27
[maven-release-plugin] prepare release 0.5.2.8
rkolesnev Jan 25, 2024
c73d9b8
[maven-release-plugin] prepare for next development iteration
rkolesnev Jan 25, 2024
03064c4
Merge branch 'master' into fix/enable-retry-queue
sangreal Feb 8, 2024
78e9256
add readme and fix issue
sangreal Feb 8, 2024
8573633
update CHANGLOG to 0.5.2.9
sangreal Feb 8, 2024
a805140
remove unused method and updates comment
sangreal Feb 9, 2024
a99b574
1. remove expiredRetryContainerCnt 2. use iterator to get expired cnt
sangreal Feb 13, 2024
2eee903
add comment and remove unused code
sangreal Feb 13, 2024
e51f169
fix according to comments
sangreal Feb 13, 2024
bf84f5d
remove unused code and fix comments
sangreal Feb 14, 2024
a66bda4
refactor to remove stale container from retry queue
sangreal Feb 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ NOTE:: Dependency version bumps are not listed here.
ifndef::github_name[]
toc::[]
endif::[]

== 0.5.2.9

=== Improvements
* improvement: add multiple caches for accelerating available container count calculation (#667)


== 0.5.2.8

=== Fixes
Expand Down
3 changes: 3 additions & 0 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1535,6 +1535,9 @@ toc::[]
endif::[]
== 0.5.2.8

=== Improvements
* improvement: add multiple caches for accelerating available container count calculation (#667)

=== Fixes

* fix: Fix equality and hash code for ShardKey with array key (#638), resolves (#579)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.confluent.parallelconsumer.internal;

/*-
* Copyright (C) 2020-2023 Confluent, Inc.
* Copyright (C) 2020-2024 Confluent, Inc.
*/

import io.confluent.csid.utils.SupplierUtils;
Expand Down Expand Up @@ -1407,20 +1407,8 @@ public void requestCommitAsap() {
notifySomethingToDo();
}

private boolean isCommandedToCommit() {
synchronized (commitCommand) {
return this.commitCommand.get();
}
}

private void clearCommitCommand() {
synchronized (commitCommand) {
if (commitCommand.get()) {
log.debug("Command to commit asap received, clearing");
this.commitCommand.set(false);
}
}
}


private boolean isTransactionCommittingInProgress() {
return options.isUsingTransactionCommitMode() &&
Expand All @@ -1447,4 +1435,20 @@ public void resumeIfPaused() {
log.debug("Skipping transition of parallel consumer to state running. Current state is {}.", this.state);
}
}

private boolean isCommandedToCommit() {
synchronized (commitCommand) {
return this.commitCommand.get();
}
}

private void clearCommitCommand() {
synchronized (commitCommand) {
if (commitCommand.get()) {
log.debug("Command to commit asap received, clearing");
this.commitCommand.set(false);
}
}
}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.confluent.parallelconsumer.state;

/*-
* Copyright (C) 2020-2023 Confluent, Inc.
* Copyright (C) 2020-2024 Confluent, Inc.
*/

import io.confluent.parallelconsumer.ParallelConsumerOptions;
Expand All @@ -14,6 +14,7 @@
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static io.confluent.csid.utils.BackportUtils.toSeconds;
Expand Down Expand Up @@ -44,6 +45,7 @@ public class ProcessingShard<K, V> {
@Getter
private final NavigableMap<Long, WorkContainer<K, V>> entries = new ConcurrentSkipListMap<>();


@Getter(PRIVATE)
private final ShardKey key;

Expand All @@ -53,9 +55,12 @@ public class ProcessingShard<K, V> {

private final RateLimiter slowWarningRateLimit = new RateLimiter(5);

private AtomicLong availableWorkContainerCnt = new AtomicLong(0);

private AtomicLong expiredRetryContainerCnt = new AtomicLong(0);

public boolean workIsWaitingToBeProcessed() {
return entries.values().parallelStream()
.anyMatch(kvWorkContainer -> kvWorkContainer.isAvailableToTakeAsWork());
return availableWorkContainerCnt.get() > 0L;
}

public void addWorkContainer(WorkContainer<K, V> wc) {
Expand All @@ -64,23 +69,36 @@ public void addWorkContainer(WorkContainer<K, V> wc) {
log.debug("Entry for {} already exists in shard queue, dropping record", wc);
} else {
entries.put(key, wc);
availableWorkContainerCnt.incrementAndGet();
}
}

public void onSuccess(WorkContainer<?, ?> wc) {
// remove work from shard's queue
entries.remove(wc.offset());

// decrease for retry only
if (wc.getLastFailedAt().isPresent()) {
expiredRetryContainerCnt.decrementAndGet();
}
}

public void onFailure() {
// increase available cnt first to let retry expired calculated later
availableWorkContainerCnt.incrementAndGet();
}


public boolean isEmpty() {
return entries.isEmpty();
}

public long getCountOfWorkAwaitingSelection() {
return entries.values().stream()
// todo missing pm.isBlocked(topicPartition) ?
.filter(WorkContainer::isAvailableToTakeAsWork)
.count();
return availableWorkContainerCnt.get();
}

public long getExpiredRetryContainerCnt() {
return expiredRetryContainerCnt.get();
}

public long getCountOfWorkTracked() {
Expand All @@ -94,10 +112,16 @@ public long getCountWorkInFlight() {
}

public WorkContainer<K, V> remove(long offset) {
// from onPartitionsRemoved callback, need to deduce the available worker count for the revoked partition
WorkContainer<K, V> toRemovedWorker = entries.get(offset);
if (toRemovedWorker != null && toRemovedWorker.isAvailableToTakeAsWork()) {
dcrAvailableWorkContainerCntByDelta(1);
}
return entries.remove(offset);
}



// remove staled WorkContainer otherwise when the partition is reassigned, the staled messages will:
// 1. block the new work containers to be picked and processed
// 2. will cause the consumer to paused consuming new messages indefinitely
rkolesnev marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -109,7 +133,6 @@ public boolean removeStaleWorkContainersFromShard() {
});
}


ArrayList<WorkContainer<K, V>> getWorkIfAvailable(int workToGetDelta) {
log.trace("Looking for work on shardQueueEntry: {}", getKey());

Expand All @@ -123,6 +146,11 @@ ArrayList<WorkContainer<K, V>> getWorkIfAvailable(int workToGetDelta) {
if (pm.couldBeTakenAsWork(workContainer)) {
if (workContainer.isAvailableToTakeAsWork()) {
log.trace("Taking {} as work", workContainer);

// only increase the ExpiredRetryContainerCnt when this is retry due since already added in the new container creation
if (workContainer.isDelayExistsExpired()) {
expiredRetryContainerCnt.incrementAndGet();
}
rkolesnev marked this conversation as resolved.
Show resolved Hide resolved
workContainer.onQueueingForExecution();
workTaken.add(workContainer);
} else {
Expand Down Expand Up @@ -154,6 +182,8 @@ ArrayList<WorkContainer<K, V>> getWorkIfAvailable(int workToGetDelta) {

logSlowWork(slowWork);

dcrAvailableWorkContainerCntByDelta(workTaken.size());

return workTaken;
}

Expand Down Expand Up @@ -200,4 +230,12 @@ private boolean isOrderRestricted() {
private boolean isWorkContainerStale(WorkContainer<K, V> workContainer) {
return pm.getPartitionState(workContainer).checkIfWorkIsStale(workContainer);
}

private void dcrAvailableWorkContainerCntByDelta(int ByNum) {
availableWorkContainerCnt.getAndAdd(-1 * ByNum);
// in case of possible race condition
if (availableWorkContainerCnt.get() < 0L) {
availableWorkContainerCnt.set(0L);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.confluent.parallelconsumer.state;

/*-
* Copyright (C) 2020-2023 Confluent, Inc.
* Copyright (C) 2020-2024 Confluent, Inc.
*/

import io.confluent.csid.utils.LoopingResumingIterator;
Expand All @@ -22,7 +22,11 @@
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.KEY;
Expand Down Expand Up @@ -92,6 +96,9 @@ public class ShardManager<K, V> {
@Getter(AccessLevel.PACKAGE) // visible for testing
private final NavigableSet<WorkContainer<?, ?>> retryQueue = new TreeSet<>(retryQueueWorkContainerComparator);

@Getter
private final AtomicLong retryItemCnt = new AtomicLong(0);
rkolesnev marked this conversation as resolved.
Show resolved Hide resolved

/**
* Iteration resume point, to ensure fairness (prevent shard starvation) when we can't process messages from every
* shard.
Expand Down Expand Up @@ -132,9 +139,13 @@ ShardKey computeShardKey(ConsumerRecord<?, ?> wc) {
* @return Work ready in the processing shards, awaiting selection as work to do
*/
public long getNumberOfWorkQueuedInShardsAwaitingSelection() {
// all available container count - (still pending for running retry containers count)
// => all_available_count - (retryCnt - all_expired_retry_cnt) => all_available_count + all_expired_retry_cnt - retryCnt

return processingShards.values().stream()
.mapToLong(ProcessingShard::getCountOfWorkAwaitingSelection)
.sum();
.mapToLong(processingShard ->
processingShard.getCountOfWorkAwaitingSelection() + processingShard.getExpiredRetryContainerCnt())
.sum() - retryItemCnt.get();
}

public boolean workIsWaitingToBeProcessed() {
rkolesnev marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -170,7 +181,9 @@ private void removeWorkFromShardFor(ConsumerRecord<K, V> consumerRecord) {
WorkContainer<K, V> removedWC = shard.remove(consumerRecord.offset());

// remove if in retry queue
this.retryQueue.remove(removedWC);
if (this.retryQueue.remove(removedWC)) {
retryItemCnt.decrementAndGet();
}

// remove the shard if empty
removeShardIfEmpty(shardKey);
Expand Down Expand Up @@ -204,7 +217,9 @@ void removeShardIfEmpty(ShardKey key) {

public void onSuccess(WorkContainer<?, ?> wc) {
// remove from the retry queue if it's contained
this.retryQueue.remove(wc);
if (this.retryQueue.remove(wc)) {
retryItemCnt.decrementAndGet();
}

// remove from processing queues
var key = computeShardKey(wc);
Expand All @@ -224,7 +239,16 @@ public void onSuccess(WorkContainer<?, ?> wc) {
*/
public void onFailure(WorkContainer<?, ?> wc) {
log.debug("Work FAILED");
this.retryQueue.add(wc);
if (this.retryQueue.add(wc)) {
retryItemCnt.incrementAndGet();
}
var key = computeShardKey(wc);
var shardOptional = getShard(key);

if (shardOptional.isPresent()) {
shardOptional.get().onFailure();
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.confluent.parallelconsumer.state;

/*-
* Copyright (C) 2020-2023 Confluent, Inc.
* Copyright (C) 2020-2024 Confluent, Inc.
*/

import io.confluent.parallelconsumer.PollContextInternal;
Expand Down Expand Up @@ -109,6 +109,18 @@ public boolean isDelayPassed() {
return negative;
}

/**
*
* @return whether this is retry and should be executed
*/
public boolean isDelayExistsExpired() {
rkolesnev marked this conversation as resolved.
Show resolved Hide resolved
if (!lastFailedAt.isPresent()) {
return false;
}
Duration delay = getDelayUntilRetryDue();
return delay.isNegative() || delay.isZero();
}

/**
* @return time until it should be retried
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.confluent.parallelconsumer.state;

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
* Copyright (C) 2020-2024 Confluent, Inc.
*/

import io.confluent.parallelconsumer.internal.PCModuleTestEnv;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.confluent.parallelconsumer.state;

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
* Copyright (C) 2020-2024 Confluent, Inc.
*/

import com.google.common.truth.Truth;
Expand Down Expand Up @@ -131,6 +131,7 @@ private ConsumerRecord<String, String> makeRec(String value, String key, int par
return stringStringConsumerRecord;
}


@ParameterizedTest
@EnumSource
void basic(ParallelConsumerOptions.ProcessingOrder order) {
Expand Down Expand Up @@ -738,4 +739,41 @@ void starvation() {

}

/**
* Tests available worker cnt
*/
@Test
void testAvailableWorkerCnt() {
ParallelConsumerOptions<?, ?> build = ParallelConsumerOptions.builder().ordering(PARTITION).build();
setupWorkManager(build);
// sanity
assertThat(wm.getOptions().getOrdering()).isEqualTo(PARTITION);

registerSomeWork();

int total = 3;
int maxWorkToGet = 2;

var works = wm.getWorkIfAvailable(maxWorkToGet);

assertThat(wm.getNumberOfWorkQueuedInShardsAwaitingSelection()).isEqualTo(total - works.size());

// fail the work
var wc = works.get(0);
fail(wc);


// advance clock to make delay pass
advanceClockByDelay();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// work should now be ready to take
works = wm.getWorkIfAvailable(maxWorkToGet);
assertThat(wm.getNumberOfWorkQueuedInShardsAwaitingSelection()).isEqualTo(total - works.size());

}

}