Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: After new performance fix PR#530 merges - corner case could cause out of order processing #534

Merged
merged 3 commits into from
Jan 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ endif::[]
=== Fixes

* fixes: #195 NoSuchFieldException when using consumer inherited from KafkaConsumer (#469)
* fix: After new performance fix PR#530 merges - corner case could cause out of order processing (#534)

=== Improvements

* perf: Adds a caching layer to work management to alleviate O(n) counting (#530)

== 0.5.2.4

Expand Down
5 changes: 5 additions & 0 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1317,6 +1317,11 @@ endif::[]
=== Fixes

* fixes: #195 NoSuchFieldException when using consumer inherited from KafkaConsumer (#469)
* fix: After new performance fix PR#530 merges - corner case could cause out of order processing (#534)

=== Improvements

* perf: Adds a caching layer to work management to alleviate O(n) counting (#530)

== 0.5.2.4

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,10 @@ private boolean updateBlockFromEncodingResult(String offsetMapPayload) {
metaPayloadLength, getPressureThresholdValue(), DefaultMaxMetadataSize);

} else { // and thus (metaPayloadLength <= pressureThresholdValue)
setAllowedMoreRecords(true);
if (allowedMoreRecords == false) {
// guard is useful for debugging to catch the transition from false to true
setAllowedMoreRecords(true);
}
log.debug("Payload size {} within threshold {}", metaPayloadLength, getPressureThresholdValue());
}

Expand Down Expand Up @@ -590,9 +593,11 @@ public boolean couldBeTakenAsWork(WorkContainer<K, V> workContainer) {
log.debug("Work is in queue with stale epoch or no longer assigned. Skipping. Shard it came from will/was removed during partition revocation. WC: {}", workContainer);
return false;
} else if (isAllowedMoreRecords()) {
log.debug("Partition is allowed more records. Taking work. WC: {}", workContainer);
return true;
} else if (isBlockingProgress(workContainer)) {
// allow record to be taken, even if partition is blocked, as this record completion may reduce payload size requirement
log.debug("Partition is blocked, but this record is blocking progress. Taking work. WC: {}", workContainer);
return true;
} else {
log.debug("Not allowed more records for the partition ({}) as set from previous encode run (blocked), that this " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,12 @@ ArrayList<WorkContainer<K, V>> getWorkIfAvailable(int workToGetDelta) {
var workContainer = iterator.next().getValue();

if (pm.couldBeTakenAsWork(workContainer)) {

if (workContainer.isAvailableToTakeAsWork()) {
log.trace("Taking {} as work", workContainer);
workContainer.onQueueingForExecution();
workTaken.add(workContainer);
} else {
log.trace("Skipping {} as work, not available to take as work", workContainer);
addToSlowWorkMaybe(slowWork, workContainer);
}

Expand All @@ -122,6 +122,15 @@ ArrayList<WorkContainer<K, V>> getWorkIfAvailable(int workToGetDelta) {
log.trace("Processing by {}, so have cannot get more messages on this ({}) shardEntry.", this.options.getOrdering(), getKey());
break;
}
} else {
// break, assuming all work in this shard, is for the same ShardKey, which is always on the same
// partition (regardless of ordering mode - KEY, PARTITION or UNORDERED (which is parallel PARTITIONs)),
// so no point continuing shard scanning. This only isn't true if a non standard partitioner produced the
// recrods of the same key to different partitions. In which case, there's no way PC can make sure all
// records of that belong to the shard are able to even be processed by the same PC instance, so it doesn't
// matter.
log.trace("Partition for shard {} is blocked for work taking, stopping shard scan", this);
break;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public boolean isUserFunctionSucceeded() {

@Override
public String toString() {
return "WorkContainer(" + toTopicPartition(cr) + ":" + cr.offset() + ":" + cr.key() + ")";
return "WorkContainer(tp:" + toTopicPartition(cr) + ":o:" + cr.offset() + ":k:" + cr.key() + ")";
}

public Duration getTimeInFlight() {
Expand Down