Find answers from the community

Updated 2 weeks ago

Deploying Agent Workflows with Human-in-the-Loop and Function Calling in FastAPI over Websockets

Hi everyone!
Has anyone here succeeded in deploying an agent workflow with human-in-the-loop and function calling in a FastAPI environment over websockets? I have my workflow running just great, as long as the user stays within the main stream_events completing the worflow in one session. If the user decides to leave in in the middle (e.g. navigating away) and websocket gets disconnected, my entire workflow breaks.
I've been trying to implement the save context scenario with no success. I've followed the documentation over and over, still same results: after restoring the context execution hangs at: async for event in handler.stream_events(). I've attached a screenshot of my debugging session.

I've tried to create a minimal version with Google Colab as @Logan M suggested, but I'm unable to reproduce a real asynchronous scenario where a user exits the main loop and reenters at a later stage.
Attachment
Screenshot_2025-02-26_at_11.44.23_2.png
L
A
60 comments
How are you handling saving the context when the websocket disconnects?
That seems like a crucial piece of the puzzle
I'm saving the context with each iteration at the bottom, right before breaking from the main loop. Is that right approach?
Attachment
image.png
hmmm, let me check in google colab

You might only want to save the state when the workflow finishes running or when it gets an InputRequiredEvent
rather than saving it on every stream event
@Logan M I've modified the code to only save context inside the InputRequiredEvent and at the end of the workflow as you suggested. But the result is the same πŸ˜₯
I've attached a screenshot where I set the breakpoint at the last statement that gets executed before it hangs completely. Is there anything specific I should look for? Maybe in the ctx ?
Attachments
Screenshot_2025-02-26_at_16.39.05.png
image.png
hmm, in the ctx dict, do you see any values for in_progress, is_running, and waiter_id ?
Something is weird here honestly πŸ€” I still need to try and reproduce this
ideally outside of fastapi
@Logan M Yes, I see those values in the ctx_dict
Attachment
image.png
hmmm but they are empty πŸ€”
You're right
(im not sure if thats expected or not honestly, need to dig a bit)
I'm inspecting the contents of ctx_dict right after serialization with handler.ctx.to_dict(serializer=JsonSerializer()) and in_progress is still empty. Maybe that's expected as you mentioned...
hmm, I think I got stuck on a slightly different issue, await handler waits forever πŸ˜… hmm
debugging now
So, I cant reproduce, but maybe we can compare context data
lemme just format this nicely
So when I save the context before restoring, I get something like this (using pprint to print the context dict, cut out some of the fat)

Plain Text
{'globals': {'agents': '["Agent"]',
             'current_agent_name': '"Agent"',
             'handoff_output_prompt': '"Agent {to_agent} is now handling the '
                                      'request due to the following reason: '
                                      '{reason}.\\nPlease continue with the '
                                      'current request."',
             'memory': '{"__is_component": true, "value": {"chat_store": '
                       '{"store": {"chat_history": [{"role": "user", '
                       '"additional_kwargs": {}, "blocks": [{"block_type": '
                       '"text", "text": "I want to proceed with the dangerous '
                       'task."}]}]}, "class_name": "SimpleChatStore"}, '
                       '"chat_store_key": "chat_history", "token_limit": '
                       '96000, "class_name": "ChatMemoryBuffer"}, '
                       '"qualified_name": '
                       '"llama_index.core.memory.chat_memory_buffer.ChatMemoryBuffer"}',
             'num_tool_calls': '1',
             'scratchpad': '[{"__is_pydantic": true, "value": {"role": '
                           '"assistant", "additional_kwargs": {"tool_calls": '
                           '[{"index": 0, "id": '
                           '"call_gASClWjTwjd45RkalhzwruvQ", "function": '
                           '{"arguments": "{}", "name": "dangerous_task"}, '
                           '"type": "function"}]}, "blocks": [{"block_type": '
                           '"text", "text": ""}]}, "qualified_name": '
                           '"llama_index.core.base.llms.types.ChatMessage"}]',
             'state': '{}',
             'user_msg_str': '"I want to proceed with the dangerous task."'},
 'in_progress': {'call_tool': ['{"__is_pydantic": true, "value": {"tool_name": '
                               '"dangerous_task", "tool_kwargs": {}, '
                               '"tool_id": "call_gASClWjTwjd45RkalhzwruvQ"}, '
                               '"qualified_name": '
                               '"llama_index.core.agent.workflow.workflow_events.ToolCall"}'],
                 'init_run': [],
                 'parse_agent_output': [],
                 'run_agent_step': [],
                 'setup_agent': []},
 'is_running': True,
 'queues': {'32fe8d6c-4699-42a3-8dad-9d76cfdc19c8': '["{\\"__is_pydantic\\": ' ….”]’,
            '_done': '[]',
            'aggregate_tool_results': '[]',
            'call_tool': '[]',
            'init_run': '[]',
            'parse_agent_output': '[]',
            'run_agent_step': '[]',
            'setup_agent': '[]'},
 'stepwise': False,
 'streaming_queue': '[]',
 'waiter_id': '32fe8d6c-4699-42a3-8dad-9d76cfdc19c8'}
Does that seem similar to your context dict?
This is the code I used, fyi
Plain Text
from llama_index.llms.openai import OpenAI
import asyncio
llm = OpenAI(model="gpt-4o-mini", api_key="sk-...")

from llama_index.core.workflow import (
    Context,
    InputRequiredEvent,
    HumanResponseEvent,
)
from llama_index.core.agent.workflow import AgentWorkflow


async def dangerous_task(ctx: Context) -> str:
    """A dangerous task that requires human confirmation."""
    ctx.write_event_to_stream(
        InputRequiredEvent(
            prefix="Are you sure you want to proceed?",
            user_name="Logan",
        )
    )

    response = await ctx.wait_for_event(
        HumanResponseEvent, requirements={"user_name": "Logan"}
    )
    if response.response == "yes":
        return "Dangerous task completed successfully."
    else:
        return "Dangerous task aborted."


async def main():
    workflow = AgentWorkflow.from_tools_or_functions(
        [dangerous_task],
        llm=llm,
        system_prompt="You are a helpful assistant that can perform dangerous tasks.",
        verbose=True,
    )

    from llama_index.core.workflow import JsonSerializer

    handler = workflow.run(user_msg="I want to proceed with the dangerous task.")

    input_ev = None
    ctx_dict = None
    async for event in handler.stream_events():
        if isinstance(event, InputRequiredEvent):
            input_ev = event  

            # save the context somewhere for later
            ctx_dict = handler.ctx.to_dict(serializer=JsonSerializer())
            break
    
    import pprint
    pprint.pprint(ctx_dict)



    # get the response from the user
    response_str = "yes"

    # restore the workflow
    restored_ctx = Context.from_dict(
        workflow, ctx_dict, serializer=JsonSerializer()
    )

    handler = workflow.run(ctx=restored_ctx)
    handler.ctx.send_event(
        HumanResponseEvent(
            response=response_str,
            user_name=input_ev.user_name,
        )
    )

    async for event in handler.stream_events():
        pass
        # print(type(event))
        # print("---")

    print("awaiting response", flush=True)
    response = await handler
    print("Got response: ", str(response), flush=True)
    return response


if __name__ == "__main__":
    asyncio.run(main())
@Logan M My code is slightly different, but should achieve same results I guess. I use recursion, where I have a stream_events() function that runs the main loop and calls a utility function that handles user input, which in turn leads the user back into the stream_events() loop. That seems to work fine (I always exit loops to avoid any racing conditions...)
I've attached a json file of my ctx contents , a snapshot of the contents right after serialization. Do you see anything out of the ordinary? It looks similar.
I'm thinking whether a blocking condition could cause the hang the second time around when the user enters the loop again. The function handle_websocket_receive() handles user input, and the leads the user back into the loop where the attempt to load context fails.
Attachments
Screenshot_2025-02-26_at_18.15.30.png
Screenshot_2025-02-26_at_18.13.34.png
hmm, maybe a better way to do this is
  1. If input is required, save the ctx, but then either exit the handle stream loop, or use handler.cancel_run() to cancel the current run
  2. then once you have the user input, restart the workflow handler? πŸ€”
still fixing some other issues I found locally though, unsure yet if they are related or not
ok, fixed a kind of spooky bug that might have actually been causing you issues. I wasn't getting stuck on the async for event in ... in the event stream, but I WAS getting stuck on await handler because the tool call step was getting triggered twice -- are you sure thats not also where you got stuck?
@Logan M What's the best way to test your PR? There's no release I guess... I have a requirements.txt in my docker container πŸ€”
I've just tested your suggestion to use handler.cancel_run() instead of exiting the loop. No changes. I've also verified that the line where it gets stuck is async for event in handler.stream_events() in my case.
Maybe your fix is related. I just have to figure out how to test it
If you want to test it, you'll have to clone and install the llama-index-core folder
But it should be able to merge and release later tonight
I'm just surprised nobody else has mentioned this issue since a FastAPI websockets setup must be very common.
In my experience, websockets in the wild aren't used often actually
How do you then handle realtime conversation with an agent? πŸ‘€
From a client app
Normal rest, cache and load the state
The websockets package in fastapi is also very young -- missing lots of features. I found it very unstable.

I actually replaced it with socket.io in my prod app, mounting it inside my fastapi app
You might not run into the same, but something to consider
When you say it's unstable, are you talking about the starlette websockets package?
I'll check the socket.io package. I guess this is the one? https://pypi.org/project/python-socketio/
Yea that's the one

And by unstable, I mean, I found it randomly disconnects, or has trouble maintaining long connections. Was very odd honestly, hard to debug lol
Was connecting a python backend to a next js frontend
It's funny because in my case it has been working great. I have a react client and it seems to work just fine, except for this specific scenario...which doesn't seem to be related to websockets (but I may be wrong... so it's worth a shot)
I'll first try your PR once you release it. Thanks for your help!
@Logan M I've put together a basic script following the dangerous task example your posted above, only that I've refactored it a bit to simulate a realtime scenario (without websockets ) as we discussed yesterday . This is using the latest llamaindex from source that includes your PR.
There are some weird behaviors like the agent asking all questions twice (I wonder if this is a bug in my code...).
Then generating a report and executing the response = await handler statement multiples times at end of the workflow , but considering the answers provided the first time.
The script is very straightforward, I'm exiting the main loop when user input is required, reentering the loop later and restoring the context (which seems to work fine),
And here's the output
thanks for the script, taking a look πŸ‘
Yeah, I just edited my comment to provide more context
hmm, yea this code is definitely recursive, because handle_stream_events() -> handle_user_input() -> handle_stream_events() again -- since handle_stream_events() is called twice, things will get very weird

Going to refactor slightly
@Logan M So I refactored the example to remove recursion completely. Replaced it with an outer loop. The workflow completes successfully except there's a weird error at the end of the workflow, which I've seen before in my tests: task: <Task finished name='call_tool' coro=<Workflow._start.<locals>._task() done, defined at /Users/ariel/Projects/llama_index/llama-index-core/llama_index/core/workflow/workflow.py:226> exception=KeyError('call_tool')>
I've seen this key error before a few times. I've attached the refactored code and a clip. I can send the traceback log if it's helpful.
seems like something isnt try/excepting the CancelledError -- doesn't matter too much, its cosmetic (im pretty sure), but happy to clean it up, I'll take a look when I get a chance
Then nothing to worry about πŸ™‚
@Logan M Hey! Sorry to bug you again. Just wanted to mention that I'm testing v0.12.21 and there are still certain scenarios where it seems to hang at : response = await handler . The issue you mentioned the other day. I'm trying to reproduce it so I can send it
Oh man πŸ˜… Yea if you can reproduce again, super helpful
@Logan M So here's again the simple dangerous tasks example . No recursion ,just a simple loop. Right before getting stuck at response = await handler there's an exception in the logs. Maybe it's not relevant, just an unhandled one... flow continues anyways:
Plain Text
Task exception was never retrieved
future: <Task finished name='_done' coro=<Workflow._start.<locals>._task() done, defined at /opt/homebrew/Caskroom/miniforge/base/envs/llamaindex-debug/lib/python3.13/site-packages/llama_index/core/workflow/workflow.py:258> exception=WorkflowDone()>
Traceback (most recent call last):
  File "/opt/homebrew/Caskroom/miniforge/base/envs/llamaindex-debug/lib/python3.13/site-packages/llama_index/core/workflow/workflow.py", line 303, in _task
    new_ev = await instrumented_step(**kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/llamaindex-debug/lib/python3.13/site-packages/llama_index/core/instrumentation/dispatcher.py", line 368, in async_wrapper
    result = await func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/llamaindex-debug/lib/python3.13/site-packages/llama_index/core/workflow/workflow.py", line 597, in _done
    raise WorkflowDone
llama_index.core.workflow.errors.WorkflowDone
hmm, debugging a bit, feels like an issue with how wait_for_event() is working πŸ€” A combination of some race conditions and a few other things

When you run a workflow from a restored ctx, any function that was in-progress starts again from the top. I wonder if this is causing some side-effect that I don't understand yet when there are multiple tools that need approval
I really would just handle things while the workflow is running, save state and resume as needed when a disconnect happens. It kind of feels like this code is more complicated than it needs to be πŸ˜…

This example works, and feels much easier to follow and understand (and if you imagine state is actually or redis thats partioned by user-ids, then it makes a ton of sense)

I need to dig deeper I think beyond that
Hey @Logan M Thanks for your reply and the refactored code. I'll give it a shot! Thanks a lot! πŸ™‚
Add a reply
Sign up and join the conversation on Discord