Skip to content

Commit

Permalink
Merge pull request #3675 from malakaganga/fix_inbound
Browse files Browse the repository at this point in the history
Fix recovery of JMS Inbounds after DB Failure
  • Loading branch information
malakaganga authored Oct 9, 2024
2 parents d2bcfcf + 5adc542 commit 8b6d3d9
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void becameUnresponsive(String nodeId) {
// stop all running coordinated tasks.
tasks.forEach(task -> {
try {
taskManager.stopExecution(task);
taskManager.stopExecutionTemporarily(task);
} catch (TaskException e) {
LOG.error("Unable to pause the task " + task, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ private void checkInterrupted() throws InterruptedException {
// stop all running coordinated tasks.
tasks.forEach(task -> {
try {
taskManager.stopExecution(task);
taskManager.stopExecutionTemporarily(task);
} catch (TaskException e) {
LOG.error("Unable to pause the task " + task, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,17 @@ protected synchronized void pauseLocalTask(String taskName) throws TaskException
}
}

protected synchronized void pauseLocalTaskTemporarily(String taskName) throws TaskException {
String taskGroup = this.getTenantTaskGroup();
try {
this.getScheduler().pauseJob(new JobKey(taskName, taskGroup));
log.info("Task temporarily paused: [" + this.getTaskType() + "][" + taskName + "]");
} catch (SchedulerException e) {
throw new TaskException("Error in temporarily pausing task with name: " + taskName,
TaskException.Code.UNKNOWN, e);
}
}

protected String getTenantTaskGroup() {
return "TENANT_" + this.getTenantId() + "_TYPE_" + this.getTaskType();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,17 @@ public void stopExecution(String taskName) throws TaskException {
locallyRunningCoordinatedTasks.remove(taskName);
}

/**
* Temporarily stops the execution of the task Since the task should be able to resume after db recovery.
*
* @param taskName - Name of the task.
* @throws TaskException - Exception.
*/
public void stopExecutionTemporarily(String taskName) throws TaskException {
this.pauseLocalTaskTemporarily(taskName);
locallyRunningCoordinatedTasks.remove(taskName);
}

private void scheduleTask(String taskName) throws TaskException {
if (this.isMyTaskTypeRegistered()) {
this.scheduleLocalTask(taskName);
Expand Down

0 comments on commit 8b6d3d9

Please sign in to comment.