Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Problem
At our company, we use LlamaIndex Workflows for chat bot services. We are encountering a very subtle race condition bug where certain events in the Workflow's event stream are missing.
Root Cause
After some debugging, we've narrowed the root cause down to the
set_workflow_state
function of the Workflow Service in Llama Deploy. This function is run when the workflow has completed. It's purpose is to update the state of the workflow on the control plane. However, a subtle bug can occur when the workflow is finished, yet there are still streaming messages that the control plane hasn't processed.Details
In the function
set_workflow_state
, the variablesession_state
contains numerous keys, one for the session ID, but also many for the tasks, results and streams:For example, there could be keys for:
01d59890-f639-448b-984b-b97e113d0d41
(session ID)504b8a6d-b846-4bbb-b684-a2b2051fcd81
(task ID)stream_504b8a6d-b846-4bbb-b684-a2b2051fcd81
result_504b8a6d-b846-4bbb-b684-a2b2051fcd81
The current code works by:
session_state = await self.get_session_state(current_state.session_id)
01d59890-f639-448b-984b-b97e113d0d41
(i.e.,current_state.session_id
).session_state[current_state.session_id] = workflow_state.model_dump_json()
await self.update_session_state(current_state.session_id, session_state)
This introduces a possible race condition where between steps 1 and 3, the value for other keys, namely the
stream_504b8a6d-b846-4bbb-b684-a2b2051fcd81
can change. In particular, additional messages may have been received, but are not processed because they are replaced in step 3.Solution
The modified code works by only updating the value for the key
01d59890-f639-448b-984b-b97e113d0d41
(the session ID), without changing the other values for the other keys.