Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not create trace log message when trace logging is not enabled #544

Merged
merged 6 commits into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -733,8 +733,10 @@ protected <R> void controlLoop(Function<PollContextInternal<K, V>, List<R>> user
}

// end of loop
log.trace("End of control loop, waiting processing {}, remaining in partition queues: {}, out for processing: {}. In state: {}",
wm.getNumberOfWorkQueuedInShardsAwaitingSelection(), wm.getNumberOfIncompleteOffsets(), wm.getNumberRecordsOutForProcessing(), state);
if (log.isTraceEnabled()) {
log.trace("End of control loop, waiting processing {}, remaining in partition queues: {}, out for processing: {}. In state: {}",
wm.getNumberOfWorkQueuedInShardsAwaitingSelection(), wm.getNumberOfIncompleteOffsets(), wm.getNumberRecordsOutForProcessing(), state);
}
}

/**
Expand Down Expand Up @@ -872,10 +874,12 @@ private static <T> List<List<T>> partition(Collection<T> sourceCollection, int m
listOfBatches.add(batchInConstruction);
}

log.debug("sourceCollection.size() {}, batches: {}, batch sizes {}",
sourceCollection.size(),
listOfBatches.size(),
listOfBatches.stream().map(List::size).collect(Collectors.toList()));
if (log.isDebugEnabled()) {
log.debug("sourceCollection.size() {}, batches: {}, batch sizes {}",
sourceCollection.size(),
listOfBatches.size(),
listOfBatches.stream().map(List::size).collect(Collectors.toList()));
}
return listOfBatches;
}

Expand Down Expand Up @@ -1097,16 +1101,18 @@ private int getNumberOfUserFunctionsQueued() {
* @return true if waiting to commit would help performance
*/
private boolean lingeringOnCommitWouldBeBeneficial() {
// work is waiting to be done
boolean workIsWaitingToBeCompletedSuccessfully = wm.workIsWaitingToBeProcessed();
// no work is currently being done
boolean workInFlight = wm.hasWorkInFlight();
// work mailbox is empty
boolean workWaitingInMailbox = !workMailBox.isEmpty();
boolean workWaitingToProcess = wm.hasIncompleteOffsets();
log.trace("workIsWaitingToBeCompletedSuccessfully {} || workInFlight {} || workWaitingInMailbox {} || !workWaitingToProcess {};",
workIsWaitingToBeCompletedSuccessfully, workInFlight, workWaitingInMailbox, !workWaitingToProcess);
boolean result = workIsWaitingToBeCompletedSuccessfully || workInFlight || workWaitingInMailbox || !workWaitingToProcess;
if (log.isTraceEnabled()) {
// work is waiting to be done
boolean workIsWaitingToBeCompletedSuccessfully = wm.workIsWaitingToBeProcessed();
// no work is currently being done
boolean workInFlight = wm.hasWorkInFlight();
// work mailbox is empty
boolean workWaitingInMailbox = !workMailBox.isEmpty();
boolean workWaitingToProcess = wm.hasIncompleteOffsets();
log.trace("workIsWaitingToBeCompletedSuccessfully {} || workInFlight {} || workWaitingInMailbox {} || !workWaitingToProcess {};",
workIsWaitingToBeCompletedSuccessfully, workInFlight, workWaitingInMailbox, !workWaitingToProcess);
boolean result = workIsWaitingToBeCompletedSuccessfully || workInFlight || workWaitingInMailbox || !workWaitingToProcess;
}

// todo disable - commit frequency takes care of lingering? is this outdated?
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,18 +156,25 @@ private void logSlowWork(Set<WorkContainer<?, ?>> slowWork) {
}

private void addToSlowWorkMaybe(Set<WorkContainer<?, ?>> slowWork, WorkContainer<?, ?> workContainer) {
var msgTemplate = "Can't take as work: Work ({}). Must all be true: Delay passed= {}. Is not in flight= {}. Has not succeeded already= {}. Time spent in execution queue: {}.";
Duration timeInFlight = workContainer.getTimeInFlight();
var msg = msg(msgTemplate, workContainer, workContainer.isDelayPassed(), workContainer.isNotInFlight(), !workContainer.isUserFunctionSucceeded(), timeInFlight);
Duration slowThreshold = options.getThresholdForTimeSpendInQueueWarning();
if (isGreaterThan(timeInFlight, slowThreshold)) {
slowWork.add(workContainer);
log.trace("Work has spent over " + slowThreshold + " in queue! " + msg);
if (log.isTraceEnabled()){
log.trace("Work has spent over " + slowThreshold + " in queue! " + cantTakeAsWorkMsg(workContainer, timeInFlight));
}
} else {
log.trace(msg);
if (log.isTraceEnabled()) {
log.trace(cantTakeAsWorkMsg(workContainer, timeInFlight));
}
}
}

private static String cantTakeAsWorkMsg(WorkContainer<?, ?> workContainer, Duration timeInFlight) {
var msgTemplate = "Can't take as work: Work ({}). Must all be true: Delay passed= {}. Is not in flight= {}. Has not succeeded already= {}. Time spent in execution queue: {}.";
return msg(msgTemplate, workContainer, workContainer.isDelayPassed(), workContainer.isNotInFlight(), !workContainer.isUserFunctionSucceeded(), timeInFlight);
}

private boolean isOrderRestricted() {
return options.getOrdering() != UNORDERED;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,14 @@ public List<WorkContainer<K, V>> getWorkIfAvailable(final int requestedMaxWorkTo
var work = sm.getWorkIfAvailable(requestedMaxWorkToRetrieve);

//
log.debug("Got {} of {} requested records of work. In-flight: {}, Awaiting in commit (partition) queues: {}",
work.size(),
requestedMaxWorkToRetrieve,
getNumberRecordsOutForProcessing(),
getNumberOfIncompleteOffsets());
if (log.isDebugEnabled()) {
log.debug("Got {} of {} requested records of work. In-flight: {}, Awaiting in commit (partition) queues: {}",
work.size(),
requestedMaxWorkToRetrieve,
getNumberRecordsOutForProcessing(),
getNumberOfIncompleteOffsets());
}
numberRecordsOutForProcessing += work.size();

return work;
}

Expand Down