Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvement-16612][Master] For logical tasks on the Master, there should be support for dry run #16616

Merged
merged 17 commits into from
Oct 12, 2024

Conversation

shouwangyw
Copy link
Contributor

Purpose of the pull request

This pull request improve support for dry run for logical tasks on the Master
fix #16612

Brief change log

  • Change AsyncMasterTaskDelayQueueLooper add asyncTaskExecutionStatus support dry run

Verify this pull request

This pull request is code cleanup without any test coverage.

(or)

This pull request is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(or)

Pull Request Notice

Pull Request Notice

If your pull request contain incompatible change, you should also add it to docs/docs/en/guide/upgrede/incompatible.md

@SbloodyS SbloodyS added improvement make more easy to user or prompt friendly priority:high labels Sep 13, 2024
@SbloodyS SbloodyS added this to the 3.3.0 milestone Sep 13, 2024
SbloodyS
SbloodyS previously approved these changes Sep 13, 2024
Copy link
Member

@SbloodyS SbloodyS left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@ruanwenjun ruanwenjun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@ruanwenjun ruanwenjun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add Integration test case for this PR? You can find some example under master/test/org/apache/dolphinscheduler/server/master/it

@shouwangyw
Copy link
Contributor Author

Could you please add Integration test case for this PR? You can find some example under master/test/org/apache/dolphinscheduler/server/master/it

Okay, no problem, I will refer to the previous test cases.

@ruanwenjun
Copy link
Member

It's needed to add integration test case to test the whole runtime case rather than ut.

@shouwangyw
Copy link
Contributor Author

shouwangyw commented Sep 18, 2024

It's needed to add integration test case to test the whole runtime case rather than ut.

Okay, but can you tell me how to run these integration test cases? They all throw errors when I run them on my local IDEA.
Is it necessary to depend on some specific local environments?


The error info:

java.lang.IllegalStateException: Failed to load ApplicationContext
	at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:132)
	at org.springframework.test.context.support.DefaultTestContext.getApplicationContext(DefaultTestContext.java:124)
	at org.springframework.test.context.web.ServletTestExecutionListener.setUpRequestContextIfNecessary(ServletTestExecutionListener.java:190)
	at org.springframework.test.context.web.ServletTestExecutionListener.prepareTestInstance(ServletTestExecutionListener.java:132)
	at org.springframework.test.context.TestContextManager.prepareTestInstance(TestContextManager.java:248)
	at org.springframework.test.context.junit.jupiter.SpringExtension.postProcessTestInstance(SpringExtension.java:138)
	at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$invokeTestInstancePostProcessors$8(ClassBasedTestDescriptor.java:363)
	at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.executeAndMaskThrowable(ClassBasedTestDescriptor.java:368)
	at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$invokeTestInstancePostProcessors$9(ClassBasedTestDescriptor.java:363)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
	at java.util.stream.StreamSpliterators$WrappingSpliterator.forEachRemaining(StreamSpliterators.java:312)
	at java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:743)
	at java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742)
	at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
	at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.invokeTestInstancePostProcessors(ClassBasedTestDescriptor.java:362)
	at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$instantiateAndPostProcessTestInstance$6(ClassBasedTestDescriptor.java:283)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.instantiateAndPostProcessTestInstance(ClassBasedTestDescriptor.java:282)
	at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$testInstancesProvider$4(ClassBasedTestDescriptor.java:272)
	at java.util.Optional.orElseGet(Optional.java:267)
	at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$testInstancesProvider$5(ClassBasedTestDescriptor.java:271)
	at org.junit.jupiter.engine.execution.TestInstancesProvider.getTestInstances(TestInstancesProvider.java:31)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$prepare$0(TestMethodTestDescriptor.java:102)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.prepare(TestMethodTestDescriptor.java:101)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.prepare(TestMethodTestDescriptor.java:66)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$prepare$2(NodeTestTask.java:123)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.prepare(NodeTestTask.java:123)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:90)
	at java.util.ArrayList.forEach(ArrayList.java:1257)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at java.util.ArrayList.forEach(ArrayList.java:1257)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
	at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
	at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
	at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
	at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'masterServer': Post-processing of merged bean definition failed; nested exception is java.lang.IllegalStateException: Failed to introspect Class [org.apache.dolphinscheduler.server.master.MasterServer] from ClassLoader [sun.misc.Launcher$AppClassLoader@18b4aac2]
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:597)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:542)
	at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:335)
	at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234)
	at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:333)
	at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:208)
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:955)
	at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:918)
	at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:583)
	at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:734)
	at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:408)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:308)
	at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:132)
	at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:99)
	at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:124)
	... 71 more
Caused by: java.lang.IllegalStateException: Failed to introspect Class [org.apache.dolphinscheduler.server.master.MasterServer] from ClassLoader [sun.misc.Launcher$AppClassLoader@18b4aac2]
	at org.springframework.util.ReflectionUtils.getDeclaredFields(ReflectionUtils.java:743)
	at org.springframework.util.ReflectionUtils.doWithLocalFields(ReflectionUtils.java:675)
	at org.springframework.context.annotation.CommonAnnotationBeanPostProcessor.buildResourceMetadata(CommonAnnotationBeanPostProcessor.java:377)
	at org.springframework.context.annotation.CommonAnnotationBeanPostProcessor.findResourceMetadata(CommonAnnotationBeanPostProcessor.java:358)
	at org.springframework.context.annotation.CommonAnnotationBeanPostProcessor.postProcessMergedBeanDefinition(CommonAnnotationBeanPostProcessor.java:306)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyMergedBeanDefinitionPostProcessors(AbstractAutowireCapableBeanFactory.java:1116)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:594)
	... 85 more
Caused by: java.lang.NoClassDefFoundError: org/apache/dolphinscheduler/extract/base/server/SpringServerMethodInvokerDiscovery
	at java.lang.ClassLoader.defineClass1(Native Method)
	at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
	at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
	at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
	at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.getDeclaredFields0(Native Method)
	at java.lang.Class.privateGetDeclaredFields(Class.java:2583)
	at java.lang.Class.getDeclaredFields(Class.java:1916)
	at org.springframework.util.ReflectionUtils.getDeclaredFields(ReflectionUtils.java:738)
	... 91 more
Caused by: java.lang.ClassNotFoundException: org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	... 107 more

@shouwangyw
Copy link
Contributor Author

shouwangyw commented Sep 20, 2024

It's needed to add integration test case to test the whole runtime case rather than ut.

It's needed to add integration test case to test the whole runtime case rather than ut.

Okay, but can you tell me how to run these integration test cases? They all throw errors when I run them on my local IDEA. Is it necessary to depend on some specific local environments?

I found the real reason for the integration test error. Some fields in the YAML file need to be renamed, for example: processDefinitionCode needs to be replaced with workflowDefinitionCode, and processDefinitionVersion needs to be replaced with workflowDefinitionVersion, etc. @ruanwenjun

And there are some errors in the assertions as well.

@ruanwenjun
Copy link
Member

It's needed to add integration test case to test the whole runtime case rather than ut.

It's needed to add integration test case to test the whole runtime case rather than ut.

Okay, but can you tell me how to run these integration test cases? They all throw errors when I run them on my local IDEA. Is it necessary to depend on some specific local environments?

I found the real reason for the integration test error. Some fields in the YAML file need to be renamed, for example: processDefinitionCode needs to be replaced with workflowDefinitionCode, and processDefinitionVersion needs to be replaced with workflowDefinitionVersion, etc. @ruanwenjun

And there are some errors in the assertions as well.

It's better to submit another PR to fix these IT.

@ruanwenjun
Copy link
Member

This is caused by the IT case has been skipped in ci, I will fixed this.

@ruanwenjun
Copy link
Member

This problem has been fixed by #16637, please rebase from upstream and add new case


@Test
@DisplayName("Test start a workflow with one sub workflow task(A) dry run success")
public void testStartWorkflow_with_subWorkflowTask_dryRunSuccess() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the parent workflow using dry run but still generate sub workflow instance? If use dry run then the SubWorkflow task will not generate sub workflow instance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the parent workflow using dry run but still generate sub workflow instance? If use dry run then the SubWorkflow task will not generate sub workflow instance.

Child workflow inherits the dry run configuration of the parent workflow.
This is indeed the effect, and the sub-workflow will also dry run.

image image

Copy link
Member

@ruanwenjun ruanwenjun Oct 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might make confusion, since physical task dry run will not go into the task plugin logic, you can find the implementation in WorkerTaskExecutor. It's better to keep this consistent.

You should do this change in MasterTaskExecutor, otherwise the other logic task still doesn't support dry run.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might make confusion, since physical task dry run will not go into the task plugin logic, you can find the implementation in WorkerTaskExecutor. It's better to keep this consistent.

You should do this change in MasterTaskExecutor, otherwise the other logic task still doesn't support dry run.

OK, I'll check it out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might make confusion, since physical task dry run will not go into the task plugin logic, you can find the implementation in WorkerTaskExecutor. It's better to keep this consistent.

You should do this change in MasterTaskExecutor, otherwise the other logic task still doesn't support dry run.

Check whether it is dry run in AsyncMasterTaskDelayQueueLooper, it may also be necessary!

Comment on lines 91 to 98
AsyncTaskExecuteFunction.AsyncTaskExecutionStatus asyncTaskExecutionStatus;
if (taskExecutionContext.getDryRun() == DRY_RUN_FLAG_YES) {
log.info(
"The current execute mode is dry run check again, will stop the logic task and set the taskInstance status to success");
asyncTaskExecutionStatus = AsyncTaskExecuteFunction.AsyncTaskExecutionStatus.SUCCESS;
} else {
asyncTaskExecutionStatus = asyncTaskExecuteFunction.getAsyncTaskExecutionStatus();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can revert this change, since the Async task will not put into asyncMasterTaskDelayQueue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can revert this change, since the Async task will not put into asyncMasterTaskDelayQueue

This is for code robustness considerations. I'm not sure if there are other places where events can be sent.

Comment on lines +104 to +112
if (DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
taskExecutionContext.setEndTime(System.currentTimeMillis());
MasterTaskExecutorHolder.removeMasterTaskExecutor(taskExecutionContext.getTaskInstanceId());
logicTaskInstanceExecutionEventSenderManager.successEventSender().sendMessage(taskExecutionContext);
log.info(
"The current execute mode is dry run, will stop the logic task and set the taskInstance status to success");
return;
}
Copy link
Member

@ruanwenjun ruanwenjun Oct 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to move this after beforeExecute I am not sure if the statemachine will throw exception, since the task lifecycle might miss running event, so the task state will be dispatched, and dispatched task cannot be converted to success.

Copy link
Contributor Author

@shouwangyw shouwangyw Oct 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to move this after beforeExecute I am not sure if the statemachine will throw exception, since the task lifecycle might miss running event, so the task state will be dispatched, and dispatched task cannot be converted to success.

I find the implementation in WorkerTaskExecutor,this is before beforeExecute.

Copy link
Contributor Author

@shouwangyw shouwangyw Oct 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test Result:
image
This is not the effect we want.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have change, test is ok

image image

Copy link
Member

@ruanwenjun ruanwenjun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, good job, wait the ci pass.

Copy link

sonarcloud bot commented Oct 11, 2024

Copy link

sonarcloud bot commented Oct 12, 2024

Copy link
Member

@SbloodyS SbloodyS left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@SbloodyS SbloodyS merged commit ccedd2e into apache:dev Oct 12, 2024
69 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Improvement][Master] For logical tasks on the Master, there should be support for dry run
3 participants