Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
arunpandianp committed Jan 24, 2025
1 parent a766d1f commit ef49e9e
Showing 1 changed file with 37 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,53 +142,55 @@ private static String elapsedString(Instant start, Instant end) {
* <p>4. STALE: A work queue for the {@link ShardedKey} exists, and there is a queued {@link Work}
* with a greater workToken than the passed in {@link Work}.
*/
synchronized ActivateWorkResult activateWorkForKey(ExecutableWork executableWork) {
ActivateWorkResult activateWorkForKey(ExecutableWork executableWork) {
ShardedKey shardedKey = executableWork.work().getShardedKey();
WorkIdWithShardingKey workIdWithShardingKey =
WorkIdWithShardingKey.builder()
.setShardingKey(shardedKey.shardingKey())
.setWorkToken(executableWork.getWorkItem().getWorkToken())
.setCacheToken(executableWork.getWorkItem().getCacheToken())
.build();
Deque<ExecutableWork> workQueue = activeWork.getOrDefault(shardedKey, new ArrayDeque<>());
// This key does not have any work queued up on it. Create one, insert Work, and mark the work
// to be executed.
if (!activeWork.containsKey(shardedKey) || workQueue.isEmpty()) {
workQueue.addLast(executableWork);
workIndex.put(workIdWithShardingKey, executableWork);
activeWork.put(shardedKey, workQueue);
incrementActiveWorkBudget(executableWork.work());
return ActivateWorkResult.EXECUTE;
}

// Check to see if we have this work token queued.
Iterator<ExecutableWork> workIterator = workQueue.iterator();
while (workIterator.hasNext()) {
ExecutableWork queuedWork = workIterator.next();
if (queuedWork.id().equals(executableWork.id())) {
return ActivateWorkResult.DUPLICATE;
synchronized (this) {
Deque<ExecutableWork> workQueue = activeWork.getOrDefault(shardedKey, new ArrayDeque<>());
// This key does not have any work queued up on it. Create one, insert Work, and mark the work
// to be executed.
if (!activeWork.containsKey(shardedKey) || workQueue.isEmpty()) {
workQueue.addLast(executableWork);
workIndex.put(workIdWithShardingKey, executableWork);
activeWork.put(shardedKey, workQueue);
incrementActiveWorkBudget(executableWork.work());
return ActivateWorkResult.EXECUTE;
}
if (queuedWork.id().cacheToken() == executableWork.id().cacheToken()) {
if (executableWork.id().workToken() > queuedWork.id().workToken()) {
// Check to see if the queuedWork is active. We only want to remove it if it is NOT
// currently active.
if (!queuedWork.equals(workQueue.peek())) {
workIterator.remove();
workIndex.remove(workIdWithShardingKey);
decrementActiveWorkBudget(queuedWork.work());

// Check to see if we have this work token queued.
Iterator<ExecutableWork> workIterator = workQueue.iterator();
while (workIterator.hasNext()) {
ExecutableWork queuedWork = workIterator.next();
if (queuedWork.id().equals(executableWork.id())) {
return ActivateWorkResult.DUPLICATE;
}
if (queuedWork.id().cacheToken() == executableWork.id().cacheToken()) {
if (executableWork.id().workToken() > queuedWork.id().workToken()) {
// Check to see if the queuedWork is active. We only want to remove it if it is NOT
// currently active.
if (!queuedWork.equals(workQueue.peek())) {
workIterator.remove();
workIndex.remove(workIdWithShardingKey);
decrementActiveWorkBudget(queuedWork.work());
}
// Continue here to possibly remove more non-active stale work that is queued.
} else {
return ActivateWorkResult.STALE;
}
// Continue here to possibly remove more non-active stale work that is queued.
} else {
return ActivateWorkResult.STALE;
}
}
}

// Queue the work for later processing.
workQueue.addLast(executableWork);
workIndex.put(workIdWithShardingKey, executableWork);
incrementActiveWorkBudget(executableWork.work());
return ActivateWorkResult.QUEUED;
// Queue the work for later processing.
workQueue.addLast(executableWork);
workIndex.put(workIdWithShardingKey, executableWork);
incrementActiveWorkBudget(executableWork.work());
return ActivateWorkResult.QUEUED;
}
}

/**
Expand Down

0 comments on commit ef49e9e

Please sign in to comment.