Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug][AbstractDelayEventBus] TaskRetryLifecycleEvent block other events #16976

Open
3 tasks done
reele opened this issue Jan 23, 2025 · 7 comments
Open
3 tasks done

[Bug][AbstractDelayEventBus] TaskRetryLifecycleEvent block other events #16976

reele opened this issue Jan 23, 2025 · 7 comments
Assignees
Labels
backend bug Something isn't working improvement make more easy to user or prompt friendly priority:high

Comments

@reele
Copy link
Contributor

reele commented Jan 23, 2025

Search before asking

  • I had searched in the issues and found no similar feature requirement.

Description

Image

a long time blocking will make the master server unresponsive.

so i'm thinking, how about adding a dedicated queue for delayed events?
or use PriorityBlockingQueue and manual implement DelayQueue's logic like org.apache.dolphinscheduler.server.master.runner.queue.PriorityDelayQueue ?

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@reele reele added improvement make more easy to user or prompt friendly Waiting for reply Waiting for reply labels Jan 23, 2025
@SbloodyS
Copy link
Member

cc @ruanwenjun

@SbloodyS SbloodyS added backend and removed Waiting for reply Waiting for reply labels Jan 23, 2025
@ruanwenjun
Copy link
Member

ruanwenjun commented Jan 23, 2025

DelayQueue contains a PriorityBlockingQueue , the element 1 should be return here, so it will not block other events.
The main problem here is the event bus worker will poll events from the event bus at an interval (default is 100ms), since one worker needs to fire multiple event bus, so it cannot block here. But this is ok, since poll is very fast.

It's great if we can move PriorityDelayQueue under dolphinscheduler-eventbus, since the main logic are same at DelayEntry and AbstractDelayEvent.

@reele
Copy link
Contributor Author

reele commented Jan 23, 2025

@ruanwenjun
i did some test for a 5mins failed retry task, run workflow, the task failed and waiting retry, and stop the workflow, the workflow stop after 5mins.

...

[WI-0][TI-0] - 2025-01-22 14:55:19.537 INFO  [MasterRpcServer-methodInvoker-5] o.a.d.s.m.e.WorkflowEventBus:[41] - Publish event: TaskRunningLifecycleEvent{task=<Task-with-retry>, runtimeContext=null}
[WI-3954361][TI-0] - 2025-01-22 14:55:19.641 INFO  [ds-workflow-eventbus-worker-11] o.a.d.s.m.e.t.l.h.AbstractTaskLifecycleEventHandler:[47] - Fired task <Task-with-retry> TaskRunningLifecycleEvent{task=<Task-with-retry>, runtimeContext=null} with state RUNNING_EXECUTION

[WI-0][TI-0] - 2025-01-22 14:55:20.400 INFO  [MasterRpcServer-methodInvoker-12] o.a.d.s.m.e.WorkflowEventBus:[41] - Publish event: TaskFailedLifecycleEvent{task=<Task-with-retry>, endTime=Wed Jan 22 14:55:20 GMT+08:00 2025}
[WI-3954361][TI-0] - 2025-01-22 14:55:20.445 INFO  [ds-workflow-eventbus-worker-10] o.a.d.s.m.e.WorkflowEventBus:[41] - Publish event: TaskRetryLifecycleEvent{task=<Task-with-retry>, delayTime=300096/ms}
[WI-3954361][TI-0] - 2025-01-22 14:55:20.447 INFO  [ds-workflow-eventbus-worker-10] o.a.d.s.m.e.t.l.h.AbstractTaskLifecycleEventHandler:[47] - Fired task <Task-with-retry> TaskFailedLifecycleEvent{task=<Task-with-retry>, endTime=Wed Jan 22 14:55:20 GMT+08:00 2025} with state RUNNING_EXECUTION

[WI-0][TI-0] - 2025-01-22 14:55:34.205 INFO  [MasterRpcServer-methodInvoker-27] o.a.d.s.m.e.WorkflowEventBus:[41] - Publish event: WorkflowStopLifecycleEvent{workflow=<Workflow-with-retry-task>-20250122145518737}


@@@@#### here was blocking WorkflowStopLifecycleEvent for 5mins ####@@@@


[WI-3954361][TI-0] - 2025-01-22 15:00:20.577 INFO  [ds-workflow-eventbus-worker-20] o.a.d.s.m.e.WorkflowEventBus:[41] - Publish event: TaskStartLifecycleEvent{task=<Task-with-retry>}
[WI-3954361][TI-0] - 2025-01-22 15:00:20.578 INFO  [ds-workflow-eventbus-worker-20] o.a.d.s.m.e.t.l.h.AbstractTaskLifecycleEventHandler:[47] - Fired task <Task-with-retry> TaskRetryLifecycleEvent{task=<Task-with-retry>, delayTime=300096/ms} with state FAILURE
[WI-3954361][TI-0] - 2025-01-22 15:00:20.579 INFO  [ds-workflow-eventbus-worker-20] o.a.d.s.m.e.w.l.h.AbstractWorkflowLifecycleEventHandler:[47] - Begin fire workflow <Workflow-with-retry-task>-20250122145518737 LifecycleEvent[WorkflowStopLifecycleEvent{workflow=<Workflow-with-retry-task>-20250122145518737}] with state: RUNNING_EXECUTION
[WI-3954361][TI-0] - 2025-01-22 15:00:20.582 INFO  [ds-workflow-eventbus-worker-20] o.a.d.s.m.e.w.s.AbstractWorkflowStateAction:[150] - Success set WorkflowExecuteRunnable: <Workflow-with-retry-task>-20250122145518737 state from: RUNNING_EXECUTION to READY_STOP

...

and i just found the main reason is here !!:

public int compareTo(Delayed other) {
return Long.compare(this.createTimeInNano, ((AbstractDelayEvent) other).createTimeInNano);
}

AbstractDelayEvent use createTimeInNano to compare other event, DelayQueue will sort the events using createTimeInNano, so the retry event was first put in queue, DelayQueue will take retry event first.

if i change the compared value createTimeInNano to createTimeInNano + delayTime, that will not block the Following 0 delay events any more.

@reele
Copy link
Contributor Author

reele commented Jan 23, 2025

It's great if we can move PriorityDelayQueue under dolphinscheduler-eventbus, since the main logic are same at DelayEntry and AbstractDelayEvent.

Yeah, I would prefer to do it in two steps, first prioritize fixing the existing issues. then refactor the code later. how about that?

@ruanwenjun
Copy link
Member

ruanwenjun commented Jan 23, 2025

It's great if we can move PriorityDelayQueue under dolphinscheduler-eventbus, since the main logic are same at DelayEntry and AbstractDelayEvent.

Yeah, I would prefer to do it in two steps, first prioritize fixing the existing issues. then refactor the code later. how about that?

LGTM, I create two sub issue and assigned to you.

@ruanwenjun
Copy link
Member

ruanwenjun commented Jan 23, 2025

@ruanwenjun i did some test for a 5mins failed retry task, run workflow, the task failed and waiting retry, and stop the workflow, the workflow stop after 5mins.

...

[WI-0][TI-0] - 2025-01-22 14:55:19.537 INFO  [MasterRpcServer-methodInvoker-5] o.a.d.s.m.e.WorkflowEventBus:[41] - Publish event: TaskRunningLifecycleEvent{task=<Task-with-retry>, runtimeContext=null}
[WI-3954361][TI-0] - 2025-01-22 14:55:19.641 INFO  [ds-workflow-eventbus-worker-11] o.a.d.s.m.e.t.l.h.AbstractTaskLifecycleEventHandler:[47] - Fired task <Task-with-retry> TaskRunningLifecycleEvent{task=<Task-with-retry>, runtimeContext=null} with state RUNNING_EXECUTION

[WI-0][TI-0] - 2025-01-22 14:55:20.400 INFO  [MasterRpcServer-methodInvoker-12] o.a.d.s.m.e.WorkflowEventBus:[41] - Publish event: TaskFailedLifecycleEvent{task=<Task-with-retry>, endTime=Wed Jan 22 14:55:20 GMT+08:00 2025}
[WI-3954361][TI-0] - 2025-01-22 14:55:20.445 INFO  [ds-workflow-eventbus-worker-10] o.a.d.s.m.e.WorkflowEventBus:[41] - Publish event: TaskRetryLifecycleEvent{task=<Task-with-retry>, delayTime=300096/ms}
[WI-3954361][TI-0] - 2025-01-22 14:55:20.447 INFO  [ds-workflow-eventbus-worker-10] o.a.d.s.m.e.t.l.h.AbstractTaskLifecycleEventHandler:[47] - Fired task <Task-with-retry> TaskFailedLifecycleEvent{task=<Task-with-retry>, endTime=Wed Jan 22 14:55:20 GMT+08:00 2025} with state RUNNING_EXECUTION

[WI-0][TI-0] - 2025-01-22 14:55:34.205 INFO  [MasterRpcServer-methodInvoker-27] o.a.d.s.m.e.WorkflowEventBus:[41] - Publish event: WorkflowStopLifecycleEvent{workflow=<Workflow-with-retry-task>-20250122145518737}


@@@@#### here was blocking WorkflowStopLifecycleEvent for 5mins ####@@@@


[WI-3954361][TI-0] - 2025-01-22 15:00:20.577 INFO  [ds-workflow-eventbus-worker-20] o.a.d.s.m.e.WorkflowEventBus:[41] - Publish event: TaskStartLifecycleEvent{task=<Task-with-retry>}
[WI-3954361][TI-0] - 2025-01-22 15:00:20.578 INFO  [ds-workflow-eventbus-worker-20] o.a.d.s.m.e.t.l.h.AbstractTaskLifecycleEventHandler:[47] - Fired task <Task-with-retry> TaskRetryLifecycleEvent{task=<Task-with-retry>, delayTime=300096/ms} with state FAILURE
[WI-3954361][TI-0] - 2025-01-22 15:00:20.579 INFO  [ds-workflow-eventbus-worker-20] o.a.d.s.m.e.w.l.h.AbstractWorkflowLifecycleEventHandler:[47] - Begin fire workflow <Workflow-with-retry-task>-20250122145518737 LifecycleEvent[WorkflowStopLifecycleEvent{workflow=<Workflow-with-retry-task>-20250122145518737}] with state: RUNNING_EXECUTION
[WI-3954361][TI-0] - 2025-01-22 15:00:20.582 INFO  [ds-workflow-eventbus-worker-20] o.a.d.s.m.e.w.s.AbstractWorkflowStateAction:[150] - Success set WorkflowExecuteRunnable: <Workflow-with-retry-task>-20250122145518737 state from: RUNNING_EXECUTION to READY_STOP

...

and i just found the main reason is here !!:

dolphinscheduler/dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/AbstractDelayEvent.java

Lines 62 to 64 in 352b47b

public int compareTo(Delayed other) {
return Long.compare(this.createTimeInNano, ((AbstractDelayEvent) other).createTimeInNano);
}
AbstractDelayEvent use createTimeInNano to compare other event, DelayQueue will sort the events using createTimeInNano, so the retry event was first put in queue, DelayQueue will take retry event first.

if i change the compared value createTimeInNano to createTimeInNano + delayTime, that will not block the Following 0 delay events any more.

This is a bug 👍 .

If the event is not expired then should use exipre time to compare them, if the event is already expired then should use create time to compare them.

@ruanwenjun ruanwenjun added bug Something isn't working priority:high labels Jan 23, 2025
@SbloodyS SbloodyS changed the title [Improvement][AbstractDelayEventBus] TaskRetryLifecycleEvent block other events [Bug][AbstractDelayEventBus] TaskRetryLifecycleEvent block other events Jan 23, 2025
@reele
Copy link
Contributor Author

reele commented Jan 23, 2025

If the event is not expired then should use expire time to compare them, if the event is already expired then should use create time to compare them.

    /* PriorityQueue.class */
    private void siftUpComparable(int var1, E var2) {
        Comparable var3;
        int var4;
        for(var3 = (Comparable)var2; var1 > 0; var1 = var4) {
            var4 = var1 - 1 >>> 1;
            Object var5 = this.queue[var4];
            if (var3.compareTo(var5) >= 0) {
                break;
            }

            this.queue[var1] = var5;
        }

        this.queue[var1] = var3;
    }

it seems only affect to new element in offer(), if event is expired, new event will also append to the end of expired event,i think use expired time to compare is still working.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backend bug Something isn't working improvement make more easy to user or prompt friendly priority:high
Projects
None yet
Development

No branches or pull requests

3 participants