Skip to content

Commit

Permalink
Perform max.poll.interval.ms considering max.poll.records
Browse files Browse the repository at this point in the history
Without considering `max.poll.records` in the calculation of
the `max.poll.interval.ms` we might cause unnecessary rebalances
since without this patch slow consumers might trigger a rebalance
at every handled record (when their processing time ~= timeout).

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
  • Loading branch information
pierDipi committed Jul 28, 2022
1 parent d536fe1 commit 581fd58
Showing 1 changed file with 13 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ public AbstractVerticle get(final DataPlaneContract.Resource resource, final Dat
egress.getEgressConfig() :
resource.getEgressConfig();

consumerConfigs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxProcessingTimeMs(egressConfig));
final var maxProcessingTime = maxProcessingTimeMs(consumerConfigs, egressConfig);
consumerConfigs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxProcessingTime);

final var egressSubscriberSender = createConsumerRecordSender(
vertx,
Expand Down Expand Up @@ -367,20 +368,23 @@ private static boolean isResourceReferenceDefined(DataPlaneContract.Reference re
return resource != null && !resource.getNamespace().isBlank() && !resource.getName().isBlank();
}

private static long maxProcessingTimeMs(final EgressConfig egressConfig) {
private static long maxProcessingTimeMs(final HashMap<String, Object> consumerConfigs, final EgressConfig egressConfig) {
final var mpr = consumerConfigs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG);
final long maxPollRecords = mpr == null ? /* default max.poll.records */ 500L : Long.parseLong(mpr.toString());
final var retryPolicy = WebClientCloudEventSender.computeRetryPolicy(egressConfig);
final var retry = egressConfig.getRetry();
final var timeout = egressConfig.getTimeout();

var maxProcessingTime = 0;
long maxProcessingTimeForSingleRecord = 0L;
for (int i = 1; i <= retry; i++) {
maxProcessingTime += timeout + retryPolicy.apply(i);
maxProcessingTimeForSingleRecord += timeout + retryPolicy.apply(i);
}
// In addition, we add some seconds as overhead for each retry.
final var overhead = 10_000 * retry;
maxProcessingTime += overhead;
// 2 times since we consider maximum processing time as the time we take for sending events to
// a subscriber and to the dead letter sink (including retries).
return 2L * maxProcessingTime + overhead;
final long overhead = 1000L * retry;
maxProcessingTimeForSingleRecord += overhead;
// So far, the max processing time calculated is for one single record,
// however, we poll records in batches based on the `max.poll.records`
// configuration, so the total max processing time is:
return maxPollRecords * maxProcessingTimeForSingleRecord;
}
}

0 comments on commit 581fd58

Please sign in to comment.