Skip to content

Commit

Permalink
test & format
Browse files Browse the repository at this point in the history
  • Loading branch information
yqu63 committed Oct 30, 2024
1 parent b8a2713 commit 5098dc6
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 18 deletions.
20 changes: 10 additions & 10 deletions solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -327,16 +327,16 @@ public void run() {
final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
final String asyncId = message.getStr(ASYNC);
if (hasLeftOverItems) {
if (head.getId().equals(oldestItemInWorkQueue))
hasLeftOverItems = false;
if (asyncId != null){
if (completedMap.contains(asyncId) || failureMap.contains(asyncId)) {
log.debug("Found already processed task in workQueue, cleaning up. AsyncId [{}]",
asyncId);
}
if (!asyncIdMap.contains(asyncId)){
log.debug("async id has been removed by canceling. AsyncId [{}]", asyncId);
}
if (head.getId().equals(oldestItemInWorkQueue)) hasLeftOverItems = false;
if (asyncId != null) {
if (completedMap.contains(asyncId) || failureMap.contains(asyncId)) {
log.debug(
"Found already processed task in workQueue, cleaning up. AsyncId [{}]",
asyncId);
}
if (!asyncIdMap.contains(asyncId)) {
log.debug("async id has been removed by canceling. AsyncId [{}]", asyncId);
}
workQueue.remove(head);
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void allowOverseerPendingTasksToComplete() {
* @return The path of the task if found, or null if not found.
*/
private String findTaskWithRequestId(String requestIdKey, String requestId)
throws KeeperException, InterruptedException {
throws KeeperException, InterruptedException {

List<String> childNames = zookeeper.getChildren(dir, null, true);
stats.setQueueLength(childNames.size());
Expand All @@ -94,7 +94,8 @@ private String findTaskWithRequestId(String requestIdKey, String requestId)
ZkNodeProps message = ZkNodeProps.load(data);
if (message.containsKey(requestIdKey)) {
if (log.isDebugEnabled()) {
log.debug("Looking for requestId '{}', found '{}'", requestId, message.get(requestIdKey));
log.debug(
"Looking for requestId '{}', found '{}'", requestId, message.get(requestIdKey));
}
if (message.get(requestIdKey).equals(requestId)) {
return path;
Expand All @@ -111,15 +112,15 @@ private String findTaskWithRequestId(String requestIdKey, String requestId)

/** Returns true if the queue contains a task with the specified request ID. */
public boolean containsTaskWithRequestId(String requestIdKey, String requestId)
throws KeeperException, InterruptedException {
throws KeeperException, InterruptedException {

String path = findTaskWithRequestId(requestIdKey, requestId);
return path != null;
}

/** Removes the first task with the specified request ID from the queue. */
public void removeTaskWithRequestId(String requestIdKey, String requestId)
throws KeeperException, InterruptedException {
throws KeeperException, InterruptedException {

String path = findTaskWithRequestId(requestIdKey, requestId);
if (path != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -905,13 +905,18 @@ public Map<String, Object> execute(
rsp.getValues()
.add(
"status", "successfully removed stored response for [" + requestId + "]");
} else if (force && !zkController.getOverseerRunningMap().contains(requestId) && zkController.getOverseerCollectionQueue().containsTaskWithRequestId(ASYNC,requestId)) {
} else if (force
&& !zkController.getOverseerRunningMap().contains(requestId)
&& zkController
.getOverseerCollectionQueue()
.containsTaskWithRequestId(ASYNC, requestId)) {
// submitted but not started yet
zkController.getOverseerCollectionQueue().removeTaskWithRequestId(ASYNC,requestId);
zkController
.getOverseerCollectionQueue()
.removeTaskWithRequestId(ASYNC, requestId);
zkController.clearAsyncId(requestId);
rsp.getValues()
.add(
"status", "successfully removed submitted task for [" + requestId + "]");
.add("status", "successfully removed submitted task for [" + requestId + "]");
} else {
rsp.getValues()
.add("status", "[" + requestId + "] not found in stored responses");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
private static DistributedMap runningMapMock;
private static DistributedMap completedMapMock;
private static DistributedMap failureMapMock;
private static DistributedMap asyncIdMapMock;
private static HttpShardHandlerFactory shardHandlerFactoryMock;
private static HttpShardHandler shardHandlerMock;
private static ZkStateReader zkStateReaderMock;
Expand Down Expand Up @@ -159,6 +160,7 @@ public OverseerCollectionConfigSetProcessorToBeTested(
Overseer overseer,
DistributedMap completedMap,
DistributedMap failureMap,
DistributedMap asyncIdMap,
SolrMetricsContext solrMetricsContext) {
super(
zkStateReader,
Expand All @@ -172,6 +174,7 @@ public OverseerCollectionConfigSetProcessorToBeTested(
runningMap,
completedMap,
failureMap,
asyncIdMap,
solrMetricsContext);
}

Expand All @@ -190,6 +193,7 @@ public static void setUpOnce() {
runningMapMock = mock(DistributedMap.class);
completedMapMock = mock(DistributedMap.class);
failureMapMock = mock(DistributedMap.class);
asyncIdMapMock = mock(DistributedMap.class);
shardHandlerFactoryMock = mock(HttpShardHandlerFactory.class);
shardHandlerMock = mock(HttpShardHandler.class);
zkStateReaderMock = mock(ZkStateReader.class);
Expand Down Expand Up @@ -968,6 +972,7 @@ protected void testTemplate(
overseerMock,
completedMapMock,
failureMapMock,
asyncIdMapMock,
solrMetricsContextMock);

if (log.isInfoEnabled()) {
Expand Down

0 comments on commit 5098dc6

Please sign in to comment.