diff --git a/api/configs/feature/__init__.py b/api/configs/feature/__init__.py index 93475226e8e69e..e79401bdfdabed 100644 --- a/api/configs/feature/__init__.py +++ b/api/configs/feature/__init__.py @@ -443,6 +443,7 @@ class WorkflowNodeExecutionConfig(BaseSettings): """ Configuration for workflow node execution """ + MAX_SUBMIT_COUNT: PositiveInt = Field( description="Maximum number of submitted thread count in a ThreadPool for parallel node execution", default=100, diff --git a/api/core/workflow/graph_engine/graph_engine.py b/api/core/workflow/graph_engine/graph_engine.py index 0274ec0c39dc64..034b4bd3992dcb 100644 --- a/api/core/workflow/graph_engine/graph_engine.py +++ b/api/core/workflow/graph_engine/graph_engine.py @@ -53,12 +53,12 @@ class GraphEngineThreadPool(ThreadPoolExecutor): def __init__( - self, - max_workers=None, + self, + max_workers=None, thread_name_prefix="", - initializer=None, - initargs=(), - max_submit_count=dify_config.MAX_SUBMIT_COUNT + initializer=None, + initargs=(), + max_submit_count=dify_config.MAX_SUBMIT_COUNT, ) -> None: super().__init__(max_workers, thread_name_prefix, initializer, initargs) self.max_submit_count = max_submit_count diff --git a/api/core/workflow/nodes/iteration/iteration_node.py b/api/core/workflow/nodes/iteration/iteration_node.py index af88a882fbc6dd..d935228c16abe9 100644 --- a/api/core/workflow/nodes/iteration/iteration_node.py +++ b/api/core/workflow/nodes/iteration/iteration_node.py @@ -163,8 +163,9 @@ def _run(self) -> Generator[NodeEvent | InNodeEvent, None, None]: if self.node_data.is_parallel: futures: list[Future] = [] q: Queue = Queue() - thread_pool = GraphEngineThreadPool(max_workers=self.node_data.parallel_nums, - max_submit_count=dify_config.MAX_SUBMIT_COUNT) + thread_pool = GraphEngineThreadPool( + max_workers=self.node_data.parallel_nums, max_submit_count=dify_config.MAX_SUBMIT_COUNT + ) for index, item in enumerate(iterator_list_value): future: Future = thread_pool.submit( self._run_single_iter_parallel,