How are you handling saving the context when the websocket disconnects?
That seems like a crucial piece of the puzzle
I'm saving the context with each iteration at the bottom, right before breaking from the main loop. Is that right approach?
hmmm, let me check in google colab
You might only want to save the state when the workflow finishes running or when it gets an InputRequiredEvent
rather than saving it on every stream event
@Logan M I've modified the code to only save context inside the InputRequiredEvent and at the end of the workflow as you suggested. But the result is the same π₯
I've attached a screenshot where I set the breakpoint at the last statement that gets executed before it hangs completely. Is there anything specific I should look for? Maybe in the ctx ?
hmm, in the ctx dict, do you see any values for in_progress
, is_running
, and waiter_id
?
Something is weird here honestly π€ I still need to try and reproduce this
ideally outside of fastapi
@Logan M Yes, I see those values in the ctx_dict
hmmm but they are empty π€
(im not sure if thats expected or not honestly, need to dig a bit)
I'm inspecting the contents of ctx_dict right after serialization with handler.ctx.to_dict(serializer=JsonSerializer())
and in_progress is still empty. Maybe that's expected as you mentioned...
hmm, I think I got stuck on a slightly different issue, await handler
waits forever π
hmm
So, I cant reproduce, but maybe we can compare context data
lemme just format this nicely
So when I save the context before restoring, I get something like this (using pprint to print the context dict, cut out some of the fat)
{'globals': {'agents': '["Agent"]',
'current_agent_name': '"Agent"',
'handoff_output_prompt': '"Agent {to_agent} is now handling the '
'request due to the following reason: '
'{reason}.\\nPlease continue with the '
'current request."',
'memory': '{"__is_component": true, "value": {"chat_store": '
'{"store": {"chat_history": [{"role": "user", '
'"additional_kwargs": {}, "blocks": [{"block_type": '
'"text", "text": "I want to proceed with the dangerous '
'task."}]}]}, "class_name": "SimpleChatStore"}, '
'"chat_store_key": "chat_history", "token_limit": '
'96000, "class_name": "ChatMemoryBuffer"}, '
'"qualified_name": '
'"llama_index.core.memory.chat_memory_buffer.ChatMemoryBuffer"}',
'num_tool_calls': '1',
'scratchpad': '[{"__is_pydantic": true, "value": {"role": '
'"assistant", "additional_kwargs": {"tool_calls": '
'[{"index": 0, "id": '
'"call_gASClWjTwjd45RkalhzwruvQ", "function": '
'{"arguments": "{}", "name": "dangerous_task"}, '
'"type": "function"}]}, "blocks": [{"block_type": '
'"text", "text": ""}]}, "qualified_name": '
'"llama_index.core.base.llms.types.ChatMessage"}]',
'state': '{}',
'user_msg_str': '"I want to proceed with the dangerous task."'},
'in_progress': {'call_tool': ['{"__is_pydantic": true, "value": {"tool_name": '
'"dangerous_task", "tool_kwargs": {}, '
'"tool_id": "call_gASClWjTwjd45RkalhzwruvQ"}, '
'"qualified_name": '
'"llama_index.core.agent.workflow.workflow_events.ToolCall"}'],
'init_run': [],
'parse_agent_output': [],
'run_agent_step': [],
'setup_agent': []},
'is_running': True,
'queues': {'32fe8d6c-4699-42a3-8dad-9d76cfdc19c8': '["{\\"__is_pydantic\\": ' β¦.β]β,
'_done': '[]',
'aggregate_tool_results': '[]',
'call_tool': '[]',
'init_run': '[]',
'parse_agent_output': '[]',
'run_agent_step': '[]',
'setup_agent': '[]'},
'stepwise': False,
'streaming_queue': '[]',
'waiter_id': '32fe8d6c-4699-42a3-8dad-9d76cfdc19c8'}
Does that seem similar to your context dict?
This is the code I used, fyi
from llama_index.llms.openai import OpenAI
import asyncio
llm = OpenAI(model="gpt-4o-mini", api_key="sk-...")
from llama_index.core.workflow import (
Context,
InputRequiredEvent,
HumanResponseEvent,
)
from llama_index.core.agent.workflow import AgentWorkflow
async def dangerous_task(ctx: Context) -> str:
"""A dangerous task that requires human confirmation."""
ctx.write_event_to_stream(
InputRequiredEvent(
prefix="Are you sure you want to proceed?",
user_name="Logan",
)
)
response = await ctx.wait_for_event(
HumanResponseEvent, requirements={"user_name": "Logan"}
)
if response.response == "yes":
return "Dangerous task completed successfully."
else:
return "Dangerous task aborted."
async def main():
workflow = AgentWorkflow.from_tools_or_functions(
[dangerous_task],
llm=llm,
system_prompt="You are a helpful assistant that can perform dangerous tasks.",
verbose=True,
)
from llama_index.core.workflow import JsonSerializer
handler = workflow.run(user_msg="I want to proceed with the dangerous task.")
input_ev = None
ctx_dict = None
async for event in handler.stream_events():
if isinstance(event, InputRequiredEvent):
input_ev = event
# save the context somewhere for later
ctx_dict = handler.ctx.to_dict(serializer=JsonSerializer())
break
import pprint
pprint.pprint(ctx_dict)
# get the response from the user
response_str = "yes"
# restore the workflow
restored_ctx = Context.from_dict(
workflow, ctx_dict, serializer=JsonSerializer()
)
handler = workflow.run(ctx=restored_ctx)
handler.ctx.send_event(
HumanResponseEvent(
response=response_str,
user_name=input_ev.user_name,
)
)
async for event in handler.stream_events():
pass
# print(type(event))
# print("---")
print("awaiting response", flush=True)
response = await handler
print("Got response: ", str(response), flush=True)
return response
if __name__ == "__main__":
asyncio.run(main())
@Logan M My code is slightly different, but should achieve same results I guess. I use recursion, where I have a stream_events() function that runs the main loop and calls a utility function that handles user input, which in turn leads the user back into the stream_events() loop. That seems to work fine (I always exit loops to avoid any racing conditions...)
I've attached a json file of my ctx contents , a snapshot of the contents right after serialization. Do you see anything out of the ordinary? It looks similar.
I'm thinking whether a blocking condition could cause the hang the second time around when the user enters the loop again. The function handle_websocket_receive()
handles user input, and the leads the user back into the loop where the attempt to load context fails.
hmm, maybe a better way to do this is
- If input is required, save the ctx, but then either exit the handle stream loop, or use
handler.cancel_run()
to cancel the current run - then once you have the user input, restart the workflow handler? π€
still fixing some other issues I found locally though, unsure yet if they are related or not
ok, fixed a kind of spooky bug that might have actually been causing you issues. I wasn't getting stuck on the async for event in ...
in the event stream, but I WAS getting stuck on await handler
because the tool call step was getting triggered twice -- are you sure thats not also where you got stuck?
@Logan M What's the best way to test your PR? There's no release I guess... I have a requirements.txt in my docker container π€
I've just tested your suggestion to use handler.cancel_run()
instead of exiting the loop. No changes. I've also verified that the line where it gets stuck is async for event in handler.stream_events()
in my case.
Maybe your fix is related. I just have to figure out how to test it
If you want to test it, you'll have to clone and install the llama-index-core folder
But it should be able to merge and release later tonight
I'm just surprised nobody else has mentioned this issue since a FastAPI websockets setup must be very common.
In my experience, websockets in the wild aren't used often actually
How do you then handle realtime conversation with an agent? π
Normal rest, cache and load the state
The websockets package in fastapi is also very young -- missing lots of features. I found it very unstable.
I actually replaced it with socket.io in my prod app, mounting it inside my fastapi app
You might not run into the same, but something to consider
When you say it's unstable, are you talking about the starlette websockets package?
Yea that's the one
And by unstable, I mean, I found it randomly disconnects, or has trouble maintaining long connections. Was very odd honestly, hard to debug lol
Was connecting a python backend to a next js frontend
It's funny because in my case it has been working great. I have a react client and it seems to work just fine, except for this specific scenario...which doesn't seem to be related to websockets (but I may be wrong... so it's worth a shot)
I'll first try your PR once you release it. Thanks for your help!
@Logan M I've put together a basic script following the dangerous task example your posted above, only that I've refactored it a bit to simulate a realtime scenario (without websockets ) as we discussed yesterday . This is using the latest llamaindex from source that includes your PR.
There are some weird behaviors like the agent asking all questions twice (I wonder if this is a bug in my code...).
Then generating a report and executing the response = await handler
statement multiples times at end of the workflow , but considering the answers provided the first time.
The script is very straightforward, I'm exiting the main loop when user input is required, reentering the loop later and restoring the context (which seems to work fine),
thanks for the script, taking a look π
Yeah, I just edited my comment to provide more context
hmm, yea this code is definitely recursive, because handle_stream_events() -> handle_user_input() -> handle_stream_events() again -- since handle_stream_events() is called twice, things will get very weird
Going to refactor slightly
@Logan M So I refactored the example to remove recursion completely. Replaced it with an outer loop. The workflow completes successfully except there's a weird error at the end of the workflow, which I've seen before in my tests: task: <Task finished name='call_tool' coro=<Workflow._start.<locals>._task() done, defined at /Users/ariel/Projects/llama_index/llama-index-core/llama_index/core/workflow/workflow.py:226> exception=KeyError('call_tool')>
I've seen this key error before a few times. I've attached the refactored code and a clip. I can send the traceback log if it's helpful.
seems like something isnt try/excepting the CancelledError -- doesn't matter too much, its cosmetic (im pretty sure), but happy to clean it up, I'll take a look when I get a chance
Then nothing to worry about π
@Logan M Hey! Sorry to bug you again. Just wanted to mention that I'm testing v0.12.21 and there are still certain scenarios where it seems to hang at : response = await handler
. The issue you mentioned the other day. I'm trying to reproduce it so I can send it
Oh man π
Yea if you can reproduce again, super helpful
@Logan M So here's again the simple dangerous tasks example . No recursion ,just a simple loop. Right before getting stuck at
response = await handler
there's an exception in the logs. Maybe it's not relevant, just an unhandled one... flow continues anyways:
Task exception was never retrieved
future: <Task finished name='_done' coro=<Workflow._start.<locals>._task() done, defined at /opt/homebrew/Caskroom/miniforge/base/envs/llamaindex-debug/lib/python3.13/site-packages/llama_index/core/workflow/workflow.py:258> exception=WorkflowDone()>
Traceback (most recent call last):
File "/opt/homebrew/Caskroom/miniforge/base/envs/llamaindex-debug/lib/python3.13/site-packages/llama_index/core/workflow/workflow.py", line 303, in _task
new_ev = await instrumented_step(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Caskroom/miniforge/base/envs/llamaindex-debug/lib/python3.13/site-packages/llama_index/core/instrumentation/dispatcher.py", line 368, in async_wrapper
result = await func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Caskroom/miniforge/base/envs/llamaindex-debug/lib/python3.13/site-packages/llama_index/core/workflow/workflow.py", line 597, in _done
raise WorkflowDone
llama_index.core.workflow.errors.WorkflowDone
hmm, debugging a bit, feels like an issue with how wait_for_event()
is working π€ A combination of some race conditions and a few other things
When you run a workflow from a restored ctx
, any function that was in-progress starts again from the top. I wonder if this is causing some side-effect that I don't understand yet when there are multiple tools that need approval
I really would just handle things while the workflow is running, save state and resume as needed when a disconnect happens. It kind of feels like this code is more complicated than it needs to be π
This example works, and feels much easier to follow and understand (and if you imagine state
is actually or redis thats partioned by user-ids, then it makes a ton of sense)
I need to dig deeper I think beyond that
Hey @Logan M Thanks for your reply and the refactored code. I'll give it a shot! Thanks a lot! π