diff --git a/packages/components/nodes/agents/OpenAIAssistant/OpenAIAssistant.ts b/packages/components/nodes/agents/OpenAIAssistant/OpenAIAssistant.ts index db834d4c271..f7c496c350d 100644 --- a/packages/components/nodes/agents/OpenAIAssistant/OpenAIAssistant.ts +++ b/packages/components/nodes/agents/OpenAIAssistant/OpenAIAssistant.ts @@ -267,28 +267,54 @@ class OpenAIAssistant_Agents implements INode { // List all runs, in case existing thread is still running if (!isNewThread) { const promise = (threadId: string) => { - return new Promise((resolve) => { + return new Promise((resolve, reject) => { + const maxWaitTime = 30000 // Maximum wait time of 30 seconds + const startTime = Date.now() + let delay = 500 // Initial delay between retries + const maxRetries = 10 + let retries = 0 + const timeout = setInterval(async () => { - const allRuns = await openai.beta.threads.runs.list(threadId) - if (allRuns.data && allRuns.data.length) { - const firstRunId = allRuns.data[0].id - const runStatus = allRuns.data.find((run) => run.id === firstRunId)?.status - if ( - runStatus && - (runStatus === 'cancelled' || - runStatus === 'completed' || - runStatus === 'expired' || - runStatus === 'failed' || - runStatus === 'requires_action') - ) { + try { + const allRuns = await openai.beta.threads.runs.list(threadId) + if (allRuns.data && allRuns.data.length) { + const firstRunId = allRuns.data[0].id + const runStatus = allRuns.data.find((run) => run.id === firstRunId)?.status + if ( + runStatus && + (runStatus === 'cancelled' || + runStatus === 'completed' || + runStatus === 'expired' || + runStatus === 'failed' || + runStatus === 'requires_action') + ) { + clearInterval(timeout) + resolve() + } + } else { clearInterval(timeout) - resolve() + reject(new Error(`Empty Thread: ${threadId}`)) } - } else { + } catch (error: any) { + if (error.response?.status === 404) { + clearInterval(timeout) + reject(new Error(`Thread not found: ${threadId}`)) + } else if (error.response?.status === 429 && retries < maxRetries) { + retries++ + delay *= 2 + console.warn(`Rate limit exceeded, retrying in ${delay}ms...`) + } else { + clearInterval(timeout) + reject(new Error(`Unexpected error: ${error.message}`)) + } + } + + // Timeout condition to stop the loop if maxWaitTime is exceeded + if (Date.now() - startTime > maxWaitTime) { clearInterval(timeout) - resolve() + reject(new Error('Timeout waiting for thread to finish.')) } - }, 500) + }, delay) }) } await promise(threadId) @@ -576,96 +602,127 @@ class OpenAIAssistant_Agents implements INode { const promise = (threadId: string, runId: string) => { return new Promise((resolve, reject) => { + const maxWaitTime = 30000 // Maximum wait time of 30 seconds + const startTime = Date.now() + let delay = 500 // Initial delay between retries + const maxRetries = 10 + let retries = 0 + const timeout = setInterval(async () => { - const run = await openai.beta.threads.runs.retrieve(threadId, runId) - const state = run.status - if (state === 'completed') { - clearInterval(timeout) - resolve(state) - } else if (state === 'requires_action') { - if (run.required_action?.submit_tool_outputs.tool_calls) { + try { + const run = await openai.beta.threads.runs.retrieve(threadId, runId) + const state = run.status + + if (state === 'completed') { clearInterval(timeout) - const actions: ICommonObject[] = [] - run.required_action.submit_tool_outputs.tool_calls.forEach((item) => { - const functionCall = item.function - let args = {} - try { - args = JSON.parse(functionCall.arguments) - } catch (e) { - console.error('Error parsing arguments, default to empty object') - } - actions.push({ - tool: functionCall.name, - toolInput: args, - toolCallId: item.id + resolve(state) + } else if (state === 'requires_action') { + if (run.required_action?.submit_tool_outputs.tool_calls) { + clearInterval(timeout) + const actions: ICommonObject[] = [] + run.required_action.submit_tool_outputs.tool_calls.forEach((item) => { + const functionCall = item.function + let args = {} + try { + args = JSON.parse(functionCall.arguments) + } catch (e) { + console.error('Error parsing arguments, default to empty object') + } + actions.push({ + tool: functionCall.name, + toolInput: args, + toolCallId: item.id + }) }) - }) - const submitToolOutputs = [] - for (let i = 0; i < actions.length; i += 1) { - const tool = tools.find((tool: any) => tool.name === actions[i].tool) - if (!tool) continue + const submitToolOutputs = [] + for (let i = 0; i < actions.length; i += 1) { + const tool = tools.find((tool: any) => tool.name === actions[i].tool) + if (!tool) continue - // Start tool analytics - const toolIds = await analyticHandlers.onToolStart(tool.name, actions[i].toolInput, parentIds) - if (shouldStreamResponse && sseStreamer) { - sseStreamer.streamToolEvent(chatId, tool.name) + // Start tool analytics + const toolIds = await analyticHandlers.onToolStart(tool.name, actions[i].toolInput, parentIds) + if (shouldStreamResponse && sseStreamer) { + sseStreamer.streamToolEvent(chatId, tool.name) + } + + try { + const toolOutput = await tool.call(actions[i].toolInput, undefined, undefined, { + sessionId: threadId, + chatId: options.chatId, + input + }) + await analyticHandlers.onToolEnd(toolIds, toolOutput) + submitToolOutputs.push({ + tool_call_id: actions[i].toolCallId, + output: toolOutput + }) + usedTools.push({ + tool: tool.name, + toolInput: actions[i].toolInput, + toolOutput + }) + } catch (e) { + await analyticHandlers.onToolEnd(toolIds, e) + console.error('Error executing tool', e) + clearInterval(timeout) + reject( + new Error( + `Error processing thread: ${state}, Thread ID: ${threadId}, Run ID: ${runId}, Tool: ${tool.name}` + ) + ) + return + } } + const newRun = await openai.beta.threads.runs.retrieve(threadId, runId) + const newStatus = newRun?.status + try { - const toolOutput = await tool.call(actions[i].toolInput, undefined, undefined, { - sessionId: threadId, - chatId: options.chatId, - input - }) - await analyticHandlers.onToolEnd(toolIds, toolOutput) - submitToolOutputs.push({ - tool_call_id: actions[i].toolCallId, - output: toolOutput - }) - usedTools.push({ - tool: tool.name, - toolInput: actions[i].toolInput, - toolOutput - }) + if (submitToolOutputs.length && newStatus === 'requires_action') { + await openai.beta.threads.runs.submitToolOutputs(threadId, runId, { + tool_outputs: submitToolOutputs + }) + resolve(state) + } else { + await openai.beta.threads.runs.cancel(threadId, runId) + resolve('requires_action_retry') + } } catch (e) { - await analyticHandlers.onToolEnd(toolIds, e) - console.error('Error executing tool', e) clearInterval(timeout) reject( - new Error( - `Error processing thread: ${state}, Thread ID: ${threadId}, Run ID: ${runId}, Tool: ${tool.name}` - ) + new Error(`Error submitting tool outputs: ${state}, Thread ID: ${threadId}, Run ID: ${runId}`) ) - break - } - } - - const newRun = await openai.beta.threads.runs.retrieve(threadId, runId) - const newStatus = newRun?.status - - try { - if (submitToolOutputs.length && newStatus === 'requires_action') { - await openai.beta.threads.runs.submitToolOutputs(threadId, runId, { - tool_outputs: submitToolOutputs - }) - resolve(state) - } else { - await openai.beta.threads.runs.cancel(threadId, runId) - resolve('requires_action_retry') } - } catch (e) { - clearInterval(timeout) - reject(new Error(`Error submitting tool outputs: ${state}, Thread ID: ${threadId}, Run ID: ${runId}`)) } + } else if (state === 'cancelled' || state === 'expired' || state === 'failed') { + clearInterval(timeout) + reject( + new Error( + `Error processing thread: ${state}, Thread ID: ${threadId}, Run ID: ${runId}, Status: ${state}` + ) + ) } - } else if (state === 'cancelled' || state === 'expired' || state === 'failed') { + } catch (error: any) { + if (error.response?.status === 404 || error.response?.status === 429) { + clearInterval(timeout) + reject(new Error(`API error: ${error.response?.status} for Thread ID: ${threadId}, Run ID: ${runId}`)) + } else if (retries < maxRetries) { + retries++ + delay *= 2 // Exponential backoff + console.warn(`Transient error, retrying in ${delay}ms...`) + } else { + clearInterval(timeout) + reject(new Error(`Max retries reached. Error: ${error.message}`)) + } + } + + // Stop the loop if maximum wait time is exceeded + if (Date.now() - startTime > maxWaitTime) { clearInterval(timeout) - reject( - new Error(`Error processing thread: ${state}, Thread ID: ${threadId}, Run ID: ${runId}, Status: ${state}`) - ) + reject(new Error('Timeout waiting for thread to finish.')) } - }, 500) + }, delay) }) }