Skip to content
This repository was archived by the owner on Dec 13, 2023. It is now read-only.

Add spring support for annotated workers #3492

Merged
merged 1 commit into from
Mar 13, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -164,11 +164,11 @@ allprojects {
}
}

// all client and their related modules are published with Java 8 compatibility
// all client and their related modules are published with Java 11 compatibility
["annotations", "common", "client", "client-spring", "grpc", "grpc-client"].each {
project(":conductor-$it") {
compileJava {
options.release = 8
options.release = 11
}
}
}
1 change: 1 addition & 0 deletions client-spring/build.gradle
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@ dependencies {

implementation project(':conductor-common')
api project(':conductor-client')
api project(':conductor-java-sdk')

implementation "com.netflix.eureka:eureka-client:${revEurekaClient}"
implementation 'org.springframework.boot:spring-boot-starter'
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@
import com.netflix.conductor.client.automator.TaskRunnerConfigurer;
import com.netflix.conductor.client.http.TaskClient;
import com.netflix.conductor.client.worker.Worker;
import com.netflix.conductor.sdk.workflow.executor.task.AnnotatedWorkerExecutor;
import com.netflix.discovery.EurekaClient;

@Configuration(proxyBeanMethods = false)
@@ -44,6 +45,12 @@ public TaskClient taskClient(ClientProperties clientProperties) {
return taskClient;
}

@ConditionalOnMissingBean
@Bean
public AnnotatedWorkerExecutor annotatedWorkerExecutor(TaskClient taskClient) {
return new AnnotatedWorkerExecutor(taskClient);
}

@ConditionalOnMissingBean
@Bean(initMethod = "init", destroyMethod = "shutdown")
public TaskRunnerConfigurer taskRunnerConfigurer(
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2023 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.client.spring;

import java.util.Map;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;

import com.netflix.conductor.client.http.TaskClient;
import com.netflix.conductor.sdk.workflow.executor.task.AnnotatedWorkerExecutor;
import com.netflix.conductor.sdk.workflow.executor.task.WorkerConfiguration;

@Component
public class ConductorWorkerAutoConfiguration
implements ApplicationListener<ContextRefreshedEvent> {

@Autowired private TaskClient taskClient;

@Override
public void onApplicationEvent(ContextRefreshedEvent refreshedEvent) {
ApplicationContext applicationContext = refreshedEvent.getApplicationContext();
Environment environment = applicationContext.getEnvironment();
WorkerConfiguration configuration = new SpringWorkerConfiguration(environment);
AnnotatedWorkerExecutor annotatedWorkerExecutor =
new AnnotatedWorkerExecutor(taskClient, configuration);

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to make sure that this bean is created later than other beans to avoid omissions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed this by doing the initialization using context initialization event.

Map<String, Object> beans = applicationContext.getBeansWithAnnotation(Component.class);
beans.values()
.forEach(
bean -> {
annotatedWorkerExecutor.addBean(bean);
});
annotatedWorkerExecutor.startPolling();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2023 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.client.spring;

import org.springframework.core.env.Environment;

import com.netflix.conductor.sdk.workflow.executor.task.WorkerConfiguration;

public class SpringWorkerConfiguration extends WorkerConfiguration {

private final Environment environment;

public SpringWorkerConfiguration(Environment environment) {
this.environment = environment;
}

@Override
public int getPollingInterval(String taskName) {
String key = "conductor.worker." + taskName + ".pollingInterval";
return environment.getProperty(key, Integer.class, 0);
}

@Override
public int getThreadCount(String taskName) {
String key = "conductor.worker." + taskName + ".threadCount";
return environment.getProperty(key, Integer.class, 0);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2023 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.client.spring;

import java.util.Date;

import org.springframework.stereotype.Component;

import com.netflix.conductor.sdk.workflow.executor.task.TaskContext;
import com.netflix.conductor.sdk.workflow.task.InputParam;
import com.netflix.conductor.sdk.workflow.task.WorkerTask;

@Component
public class Workers {

@WorkerTask(value = "hello", threadCount = 3)
public String helloWorld(@InputParam("name") String name) {
TaskContext context = TaskContext.get();
System.out.println(new Date() + ":: Poll count: " + context.getPollCount());
if (context.getPollCount() < 5) {
context.addLog("Not ready yet, poll count is only " + context.getPollCount());
context.setCallbackAfter(1);
}

return "Hello, " + name;
}

@WorkerTask(value = "hello_again", pollingInterval = 333)
public String helloAgain(@InputParam("name") String name) {
TaskContext context = TaskContext.get();
System.out.println(new Date() + ":: Poll count: " + context.getPollCount());
if (context.getPollCount() < 5) {
context.addLog("Not ready yet, poll count is only " + context.getPollCount());
context.setCallbackAfter(1);
}

return "Hello (again), " + name;
}
}
2 changes: 2 additions & 0 deletions client-spring/src/test/resources/application.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
conductor.client.rootUri=http://localhost:8080/api/
conductor.worker.hello.threadCount=100
Original file line number Diff line number Diff line change
@@ -113,6 +113,7 @@ public WorkflowExecutor(
Workflow workflow = workflowClient.getWorkflow(workflowId, true);
if (workflow.getStatus().isTerminal()) {
future.complete(workflow);
runningWorkflowFutures.remove(workflowId);
}
}
},
@@ -140,6 +141,7 @@ public WorkflowExecutor(
Workflow workflow = workflowClient.getWorkflow(workflowId, true);
if (workflow.getStatus().isTerminal()) {
future.complete(workflow);
runningWorkflowFutures.remove(workflowId);
}
}
},
Original file line number Diff line number Diff line change
@@ -14,10 +14,7 @@

import java.lang.annotation.Annotation;
import java.lang.reflect.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.*;

import com.netflix.conductor.client.worker.Worker;
import com.netflix.conductor.common.metadata.tasks.Task;
@@ -44,6 +41,9 @@ public class AnnotatedWorker implements Worker {

private int pollingInterval = 100;

private Set<TaskResult.Status> failedStatuses =
Set.of(TaskResult.Status.FAILED, TaskResult.Status.FAILED_WITH_TERMINAL_ERROR);

public AnnotatedWorker(String name, Method workerMethod, Object obj) {
this.name = name;
this.workerMethod = workerMethod;
@@ -58,11 +58,40 @@ public String getTaskDefName() {

@Override
public TaskResult execute(Task task) {
TaskResult result;
TaskResult result = null;
try {
TaskContext context = TaskContext.set(task);
Object[] parameters = getInvocationParameters(task);
Object invocationResult = workerMethod.invoke(obj, parameters);
result = setValue(invocationResult, task);
result = setValue(invocationResult, context.getTaskResult());
if (!failedStatuses.contains(result.getStatus())
&& result.getCallbackAfterSeconds() > 0) {
result.setStatus(TaskResult.Status.IN_PROGRESS);
}
} catch (InvocationTargetException invocationTargetException) {
if (result == null) {
result = new TaskResult(task);
}
Throwable e = invocationTargetException.getCause();
e.printStackTrace();
if (e instanceof NonRetryableException) {
result.setStatus(TaskResult.Status.FAILED_WITH_TERMINAL_ERROR);
} else {
result.setStatus(TaskResult.Status.FAILED);
}

result.setReasonForIncompletion(e.getMessage());
StringBuilder stackTrace = new StringBuilder();
for (StackTraceElement stackTraceElement : e.getStackTrace()) {
String className = stackTraceElement.getClassName();
if (className.startsWith("jdk.")
|| className.startsWith(AnnotatedWorker.class.getName())) {
break;
}
stackTrace.append(stackTraceElement);
stackTrace.append("\n");
}
result.log(stackTrace.toString());
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -141,43 +170,43 @@ private static InputParam findInputParamAnnotation(Annotation[] paramAnnotation)
.orElse(null);
}

private TaskResult setValue(Object invocationResult, Task task) {
private TaskResult setValue(Object invocationResult, TaskResult result) {

if (invocationResult == null) {
task.setStatus(Task.Status.COMPLETED);
return new TaskResult(task);
result.setStatus(TaskResult.Status.COMPLETED);
return result;
}

OutputParam opAnnotation =
workerMethod.getAnnotatedReturnType().getAnnotation(OutputParam.class);
if (opAnnotation != null) {

String name = opAnnotation.value();
task.getOutputData().put(name, invocationResult);
task.setStatus(Task.Status.COMPLETED);
return new TaskResult(task);
result.getOutputData().put(name, invocationResult);
result.setStatus(TaskResult.Status.COMPLETED);
return result;

} else if (invocationResult instanceof TaskResult) {

return (TaskResult) invocationResult;

} else if (invocationResult instanceof Map) {
Map resultAsMap = (Map) invocationResult;
task.getOutputData().putAll(resultAsMap);
task.setStatus(Task.Status.COMPLETED);
return new TaskResult(task);
result.getOutputData().putAll(resultAsMap);
result.setStatus(TaskResult.Status.COMPLETED);
return result;
} else if (invocationResult instanceof String
|| invocationResult instanceof Number
|| invocationResult instanceof Boolean) {
task.getOutputData().put("result", invocationResult);
task.setStatus(Task.Status.COMPLETED);
return new TaskResult(task);
result.getOutputData().put("result", invocationResult);
result.setStatus(TaskResult.Status.COMPLETED);
return result;
} else if (invocationResult instanceof List) {

List resultAsList = om.convertValue(invocationResult, List.class);
task.getOutputData().put("result", resultAsList);
task.setStatus(Task.Status.COMPLETED);
return new TaskResult(task);
result.getOutputData().put("result", resultAsList);
result.setStatus(TaskResult.Status.COMPLETED);
return result;

} else if (invocationResult instanceof DynamicForkInput) {
DynamicForkInput forkInput = (DynamicForkInput) invocationResult;
@@ -186,25 +215,28 @@ private TaskResult setValue(Object invocationResult, Task task) {
for (com.netflix.conductor.sdk.workflow.def.tasks.Task<?> sdkTask : tasks) {
workflowTasks.addAll(sdkTask.getWorkflowDefTasks());
}
task.getOutputData().put(DynamicFork.FORK_TASK_PARAM, workflowTasks);
task.getOutputData().put(DynamicFork.FORK_TASK_INPUT_PARAM, forkInput.getInputs());
task.setStatus(Task.Status.COMPLETED);
return new TaskResult(task);
result.getOutputData().put(DynamicFork.FORK_TASK_PARAM, workflowTasks);
result.getOutputData().put(DynamicFork.FORK_TASK_INPUT_PARAM, forkInput.getInputs());
result.setStatus(TaskResult.Status.COMPLETED);
return result;

} else {
Map resultAsMap = om.convertValue(invocationResult, Map.class);
task.getOutputData().putAll(resultAsMap);
task.setStatus(Task.Status.COMPLETED);
return new TaskResult(task);
result.getOutputData().putAll(resultAsMap);
result.setStatus(TaskResult.Status.COMPLETED);
return result;
}
}

public void setPollingInterval(int pollingInterval) {
System.out.println(
"Setting the polling interval for " + getTaskDefName() + ", to " + pollingInterval);
this.pollingInterval = pollingInterval;
}

@Override
public int getPollingInterval() {
System.out.println("Sending the polling interval to " + pollingInterval);
return pollingInterval;
}
}
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@
import com.netflix.conductor.client.worker.Worker;
import com.netflix.conductor.sdk.workflow.task.WorkerTask;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.reflect.ClassPath;

public class AnnotatedWorkerExecutor {
@@ -33,23 +34,33 @@ public class AnnotatedWorkerExecutor {

private TaskRunnerConfigurer taskRunner;

private List<Worker> executors = new ArrayList<>();

private Map<String, Method> workerExecutors = new HashMap<>();

private Map<String, Integer> workerToThreadCount = new HashMap<>();

private Map<String, Integer> workerToPollingInterval = new HashMap<>();

private Map<String, Object> workerClassObjs = new HashMap<>();

private static Set<String> scannedPackages = new HashSet<>();

private int pollingInteralInMS = 100;
private WorkerConfiguration workerConfiguration;

public AnnotatedWorkerExecutor(TaskClient taskClient) {
this.taskClient = taskClient;
this.workerConfiguration = new WorkerConfiguration();
}

public AnnotatedWorkerExecutor(TaskClient taskClient, int pollingIntervalInMillis) {
this.taskClient = taskClient;
this.workerConfiguration = new WorkerConfiguration(pollingIntervalInMillis);
}

public AnnotatedWorkerExecutor(TaskClient taskClient, int pollingInteralInMS) {
public AnnotatedWorkerExecutor(TaskClient taskClient, WorkerConfiguration workerConfiguration) {
this.taskClient = taskClient;
this.pollingInteralInMS = pollingInteralInMS;
this.workerConfiguration = workerConfiguration;
}

/**
@@ -100,7 +111,7 @@ private void scanWorkers(String basePackage) {
try {
Class<?> clazz = classMeta.load();
Object obj = clazz.getConstructor().newInstance();
scanClass(clazz, obj);
addBean(obj);
} catch (Throwable t) {
// trace because many classes won't have a default no-args
// constructor and will fail
@@ -126,28 +137,48 @@ private boolean includePackage(List<String> packagesToScan, String name) {
return false;
}

private void scanClass(Class<?> clazz, Object obj) {
public void addBean(Object bean) {
Class<?> clazz = bean.getClass();
for (Method method : clazz.getMethods()) {
WorkerTask annotation = method.getAnnotation(WorkerTask.class);
if (annotation == null) {
continue;
}
String name = annotation.value();
int threadCount = annotation.threadCount();
workerExecutors.put(name, method);
workerToThreadCount.put(name, threadCount);
workerClassObjs.put(name, obj);
LOGGER.info("Adding worker for task {}, method {}", name, method);
addMethod(annotation, method, bean);
}
}

private void startPolling() {
List<Worker> executors = new ArrayList<>();
private void addMethod(WorkerTask annotation, Method method, Object bean) {
String name = annotation.value();

int threadCount = workerConfiguration.getThreadCount(name);
if (threadCount == 0) {
threadCount = annotation.threadCount();
}
workerToThreadCount.put(name, threadCount);

int pollingInterval = workerConfiguration.getPollingInterval(name);
if (pollingInterval == 0) {
pollingInterval = annotation.pollingInterval();
}
workerToPollingInterval.put(name, pollingInterval);

workerClassObjs.put(name, bean);
workerExecutors.put(name, method);
LOGGER.info(
"Adding worker for task {}, method {} with threadCount {} and polling interval set to {} ms",
name,
method,
threadCount,
pollingInterval);
}

public void startPolling() {
workerExecutors.forEach(
(taskName, method) -> {
Object obj = workerClassObjs.get(taskName);
AnnotatedWorker executor = new AnnotatedWorker(taskName, method, obj);
executor.setPollingInterval(pollingInteralInMS);
executor.setPollingInterval(workerToPollingInterval.get(taskName));
executors.add(executor);
});

@@ -164,4 +195,14 @@ private void startPolling() {

taskRunner.init();
}

@VisibleForTesting
List<Worker> getExecutors() {
return executors;
}

@VisibleForTesting
TaskRunnerConfigurer getTaskRunner() {
return taskRunner;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2023 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.sdk.workflow.executor.task;

/**
* Runtime exception indicating the non-retriable error with the task execution. If thrown, the task
* will fail with FAILED_WITH_TERMINAL_ERROR and will not kick off retries.
*/
public class NonRetryableException extends RuntimeException {

public NonRetryableException(String message) {
super(message);
}

public NonRetryableException(String message, Throwable cause) {
super(message, cause);
}

public NonRetryableException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2023 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.sdk.workflow.executor.task;

import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskResult;

/** Context for the task */
public class TaskContext {

public static final ThreadLocal<TaskContext> TASK_CONTEXT_INHERITABLE_THREAD_LOCAL =
InheritableThreadLocal.withInitial(() -> null);

public TaskContext(Task task, TaskResult taskResult) {
this.task = task;
this.taskResult = taskResult;
}

public static TaskContext get() {
return TASK_CONTEXT_INHERITABLE_THREAD_LOCAL.get();
}

public static TaskContext set(Task task) {
TaskResult result = new TaskResult(task);
TaskContext context = new TaskContext(task, result);
TASK_CONTEXT_INHERITABLE_THREAD_LOCAL.set(context);
return context;
}

private final Task task;

private final TaskResult taskResult;

public String getWorkflowInstanceId() {
return task.getWorkflowInstanceId();
}

public String getTaskId() {
return task.getTaskId();
}

public int getRetryCount() {
return task.getRetryCount();
}

public int getPollCount() {
return task.getPollCount();
}

public long getCallbackAfterSeconds() {
return task.getCallbackAfterSeconds();
}

public void addLog(String log) {
this.taskResult.log(log);
}

public Task getTask() {
return task;
}

public TaskResult getTaskResult() {
return taskResult;
}

public void setCallbackAfter(int seconds) {
this.taskResult.setCallbackAfterSeconds(seconds);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2023 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.sdk.workflow.executor.task;

public class WorkerConfiguration {

private int defaultPollingInterval = 0;

public WorkerConfiguration(int defaultPollingInterval) {
this.defaultPollingInterval = defaultPollingInterval;
}

public WorkerConfiguration() {}

public int getPollingInterval(String taskName) {
return defaultPollingInterval;
}

public int getThreadCount(String taskName) {
return 0;
}
}
Original file line number Diff line number Diff line change
@@ -23,5 +23,8 @@
public @interface WorkerTask {
String value();

// No. of threads to use for executing the task
int threadCount() default 1;

int pollingInterval() default 100;
}
Original file line number Diff line number Diff line change
@@ -23,12 +23,16 @@
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;

import com.netflix.conductor.client.automator.TaskRunnerConfigurer;
import com.netflix.conductor.client.http.TaskClient;
import com.netflix.conductor.client.worker.Worker;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.sdk.workflow.task.InputParam;
import com.netflix.conductor.sdk.workflow.task.OutputParam;
import com.netflix.conductor.sdk.workflow.task.WorkerTask;

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.mock;

public class AnnotatedWorkerTests {

@@ -202,7 +206,7 @@ void taskAsInputParam() throws NoSuchMethodException {
public @interface AnotherAnnotation {}

static class AnotherAnnotationInput {
@WorkerTask("test_1")
@WorkerTask("test_2")
public @OutputParam("result") Bike doWork(@AnotherAnnotation Bike input) {
return input;
}
@@ -229,7 +233,7 @@ void annotatedWithAnotherAnnotation() throws NoSuchMethodException {
}

static class MultipleInputParams {
@WorkerTask("test_1")
@WorkerTask(value = "test_1", threadCount = 3, pollingInterval = 333)
public Map<String, Object> doWork(
@InputParam("bike") Bike bike, @InputParam("car") Car car) {
return Map.of("bike", bike, "car", car);
@@ -260,4 +264,80 @@ void multipleInputParams() throws NoSuchMethodException {
var car = (Car) outputData.get("car");
assertEquals("BMW", car.getBrand());
}

@Test
@DisplayName("it should honor the polling interval from annotations and config")
void pollingIntervalTest() throws NoSuchMethodException {
var config = new TestWorkerConfig();

var worker = new MultipleInputParams();

AnnotatedWorkerExecutor annotatedWorkerExecutor =
new AnnotatedWorkerExecutor(mock(TaskClient.class));
annotatedWorkerExecutor.addBean(worker);
annotatedWorkerExecutor.startPolling();
List<Worker> workers = annotatedWorkerExecutor.getExecutors();
assertNotNull(workers);
assertEquals(1, workers.size());
Worker taskWorker = workers.get(0);
assertEquals(333, taskWorker.getPollingInterval());

var worker2 = new AnotherAnnotationInput();
annotatedWorkerExecutor = new AnnotatedWorkerExecutor(mock(TaskClient.class));
annotatedWorkerExecutor.addBean(worker2);
annotatedWorkerExecutor.startPolling();
workers = annotatedWorkerExecutor.getExecutors();
assertNotNull(workers);
assertEquals(1, workers.size());
taskWorker = workers.get(0);
assertEquals(100, taskWorker.getPollingInterval());

config.setPollingInterval("test_2", 123);
annotatedWorkerExecutor = new AnnotatedWorkerExecutor(mock(TaskClient.class), config);
annotatedWorkerExecutor.addBean(worker2);
annotatedWorkerExecutor.startPolling();
workers = annotatedWorkerExecutor.getExecutors();
assertNotNull(workers);
assertEquals(1, workers.size());
taskWorker = workers.get(0);
assertEquals(123, taskWorker.getPollingInterval());
}

@Test
@DisplayName("it should honor the polling interval from annotations and config")
void threadCountTest() throws NoSuchMethodException {
var config = new TestWorkerConfig();

var worker = new MultipleInputParams();
var worker2 = new AnotherAnnotationInput();

AnnotatedWorkerExecutor annotatedWorkerExecutor =
new AnnotatedWorkerExecutor(mock(TaskClient.class), config);
annotatedWorkerExecutor.addBean(worker);
annotatedWorkerExecutor.addBean(worker2);

annotatedWorkerExecutor.startPolling();
TaskRunnerConfigurer runner = annotatedWorkerExecutor.getTaskRunner();
assertNotNull(runner);
Map<String, Integer> taskThreadCount = runner.getTaskThreadCount();

assertNotNull(taskThreadCount);
assertEquals(3, taskThreadCount.get("test_1"));
assertEquals(1, taskThreadCount.get("test_2"));

annotatedWorkerExecutor.shutdown();
config.setThreadCount("test_2", 2);
annotatedWorkerExecutor = new AnnotatedWorkerExecutor(mock(TaskClient.class), config);
annotatedWorkerExecutor.addBean(worker);
annotatedWorkerExecutor.addBean(worker2);

annotatedWorkerExecutor.startPolling();
runner = annotatedWorkerExecutor.getTaskRunner();

taskThreadCount = runner.getTaskThreadCount();

assertNotNull(taskThreadCount);
assertEquals(3, taskThreadCount.get("test_1"));
assertEquals(2, taskThreadCount.get("test_2"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2023 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.sdk.workflow.executor.task;

import java.util.HashMap;
import java.util.Map;

public class TestWorkerConfig extends WorkerConfiguration {

private Map<String, Integer> pollingIntervals = new HashMap<>();

private Map<String, Integer> threadCounts = new HashMap<>();

@Override
public int getPollingInterval(String taskName) {
return pollingIntervals.getOrDefault(taskName, 0);
}

public void setPollingInterval(String taskName, int interval) {
pollingIntervals.put(taskName, interval);
}

public void setThreadCount(String taskName, int threadCount) {
threadCounts.put(taskName, threadCount);
}

@Override
public int getThreadCount(String taskName) {
return threadCounts.getOrDefault(taskName, 0);
}
}