Skip to content

Conversation

@BoBer78
Copy link
Collaborator

@BoBer78 BoBer78 commented Oct 20, 2025

Closes #580 #566 #352

Main things

  • Made the switch to the Response API.
  • The Messages DB schema is now completely different --> mimics the ResponseAPI.
  • Switch to Vercel v5 --> has same schema as Response API.
  • Refactored HIL to only allow for Accept / Reject.
  • Fixed some frontend bugs.

DB Schema change

IMPORTANT : if you want to backup your local DB before running alembic upgrade, I used :

docker exec -i  <CONTAINER_ID>  pg_dump -U postgres -d neuroagent -F c -v > backup.dump

docker exec -i <CONTAINER_ID>   pg_restore -U postgres -d neuroagent -v --clean < backup.dump

I only kept the mandatory fields so that the we can send the messages back to OpenAI. Some non mandatory info is lost in the process.

The DB messages go from User, AI_TOOL, TOOL and AI_MESSAGE to only User and Assistant. Everything is then stored in the Parts of the messages :

  • User messages only have one Message part
  • Assistant can have Reasoning, Tool_call, Tool_call_output and Message parts (Any number in any order)
    One Assistant message now correspond to one full turn of the agent loop in AgentRoutine.

The most complex part of the PR lies the migration script.

I tested extensively. You can write some messages, upgrade to new schema, write some more messages, downgrade, and repeat 10 times. It works. I did not test all the weird edge case like for example stopping a HIL message at a very specific timing.

Reponse API change

IMPORTANT : The response API of OpenRouter is still in Beta. When not streaming, it works fine, but they send the chunks in a different order sometimes and the main LLM is not compatible with it for now.
For non streamed responses or parsed structured output it works fine.

With the new change DB schema, the only thing we have to do is :
https://github.com/openbraininstitute/neuroagent/blob/f6250e7cb16586d049e05bb9e5b72ea08b23c1a0/backend/src/neuroagent/utils.py#L28C1-L40C27
And we can send it to OpenAI.

AgentRoutine Change

I made everything work with the pydantic schemas from OpenAI. So now there is no random dict, everything is an OpenAI type. One very conveniant change is that OpenAI now does the concatenation on their side. Once a Part is finished streaming, OpenAI sends us a chunk with the complete part.

temp_stream_data: dict[str, Any] = {
: this keeps the temporary data if the user stop the stream. It also keeps the current tool call that need to be executed until they are done.

The new parts are appended (when they are complete) both in the history and the new_message with this function :
https://github.com/openbraininstitute/neuroagent/blob/f6250e7cb16586d049e05bb9e5b72ea08b23c1a0/backend/src/neuroagent/utils.py#L250C1-L272C27

Vercel v5

A lot of small things were changed. The main annoyance was the the way the chunks were streamed changed completely.
But with the switch to the response API, one vercel chunk == one Response chunk so it was not that complicated.

Comment on lines +138 to +156
class Parts(Base):
"""SQL table for storing Response API parts (JSONB format)."""

__tablename__ = "parts"
part_id: Mapped[uuid.UUID] = mapped_column(
UUID, primary_key=True, default=lambda: uuid.uuid4()
)
message_id: Mapped[uuid.UUID] = mapped_column(
UUID, ForeignKey("messages.message_id")
UUID, ForeignKey("messages.message_id"), nullable=False
)
message: Mapped[Messages] = relationship("Messages", back_populates="tool_calls")
order_index: Mapped[int] = mapped_column(Integer, nullable=False)
type: Mapped[PartType] = mapped_column(Enum(PartType), nullable=False)
output: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False)
is_complete: Mapped[bool] = mapped_column(Boolean, nullable=False)
validated: Mapped[bool] = mapped_column(Boolean, nullable=True)

message: Mapped[Messages] = relationship("Messages", back_populates="parts")

__table_args__ = (Index("ix_parts_message_id", "message_id"),)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided to only keep the necessary in the table. output is the Raw OpenAI output. This is slightly annoying because if we want to get to the content of a message, it is nested quite deeply.

WebSearchTool,
# NowTool,
# WeatherTool,
WeatherTool,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remove it. Left it for review.

return {"input_cached": None, "input_noncached": None, "completion": None}


def append_part(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I defined a couple functions here to make AgentRoutine more readable. Feel free to protest.

Comment on lines +128 to +135
setMessages((prevState) => {
prevState[prevState.length - 1] = {
...prevState[prevState.length - 1],
isComplete: false,
};
// We only change the metadata at message level and keep the rest.
return prevState;
});
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might need to be revisited.

};
},
messages: retrievedMessages,
experimental_throttle: 50,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This eliminates all infintie re-render bugs. Else they are very frequent. Except while doing window switching while the chat was streaming, I never encountered the bug again.

One thing to note, now the infinite re-render bug is caught by our error handler and stops the chat.

},
messages: retrievedMessages,
experimental_throttle: 50,
sendAutomaticallyWhen: lastAssistantHasAllToolOutputs,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we have to specify when we want the agent to send a message automatically.

Comment on lines +113 to +122
// Handle chat inputs.
const [input, setInput] = useState("");
const handleSubmit = (
e: React.FormEvent<HTMLFormElement | HTMLTextAreaElement>,
) => {
e.preventDefault();
sendMessage({ text: input });
setInput("");
};

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inputs change and submit is now our responsibility, not Vercel

Comment on lines -154 to -170
// Handle streaming interruption
useEffect(() => {
if (stopped) {
setMessages((prevState) => {
prevState[prevState.length - 1] = {
...prevState[prevState.length - 1],
annotations: prevState
.at(-1)
?.annotations?.map((ann) =>
!ann.toolCallId ? { isComplete: false } : ann,
),
};
// We only change the annotation at message level and keep the rest.
return prevState;
});
}
}, [stopped, setMessages]);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might want to revisit that ... it works for now.

Comment on lines 172 to +180
useEffect(() => {
if (isInvalidating || isFetching) return;
// Set retrieved DB messaged as current messages
if (!stopped) {
setMessages(() => [
...retrievedMessages,
...messages.filter(
(m) => m.id.length !== 36 && !m.id.startsWith("temp"),
(m) => m.id.length !== 36 && !m.id.startsWith("msg"),
),
]);
} else {
setMessages(retrievedMessages);
}
}, [md5(JSON.stringify(retrievedMessages))]); // Rerun on content change
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [isInvalidating, isFetching]); // RE-run on new fetching or stop
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to get rid of this. In my testing it works fine. But please do check again.


// Observer to fetch new pages :
useEffect(() => {
const container = containerRef.current;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some cleaning.

Comment on lines +48 to 51
root: scrollContainer,
rootMargin: "100px",
threshold: 0.1,
},
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without that there was a bug. Sometimes new threads where not fetched.

Comment on lines +1 to +10
export const env = {
// Server
SERVER_SIDE_BACKEND_URL: process.env.SERVER_SIDE_BACKEND_URL,
NEXTAUTH_SECRET: process.env.NEXTAUTH_SECRET,
KEYCLOAK_ID: process.env.KEYCLOAK_ID,
KEYCLOAK_SECRET: process.env.KEYCLOAK_SECRET,
KEYCLOAK_ISSUER: process.env.KEYCLOAK_ISSUER,
// Client
NEXT_PUBLIC_BACKEND_URL: process.env.NEXT_PUBLIC_BACKEND_URL,
};
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was some compatibility issue with the new Vercel and another package that was super old. I did that as one of the first step of the PR. This is probably bad. Did not have much issue when tesing.

async def execute_tool_calls(
self,
tool_calls: list[ToolCalls],
tool_calls: list[ResponseFunctionToolCall],
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I switched everything to the OpenAI types in the AgentRoutine.

await new_message.awaitable_attrs.token_consumption
# Initialize metadata for previous HIL tool calls
tool_map = {tool.name: tool for tool in active_agent.tools}
metadata_data = get_previous_hil_metadata(new_message, tool_map)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did that to make the streamed vercel and the messages from the DB as close as possible to make handling everything in the frontend easier.

and choice.delta.reasoning
usage_data = None
# for streaming interrupt and handling tool calls
temp_stream_data: dict[str, Any] = {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now OpenAI gives us the whole part after streaming one. They do the concatenation on their side. We just need to keep the temporary data if the user stops.

IMPORTANT : this acts as the way to store the tool call that need to be executed after the stream finishes.

isinstance(event.item, ResponseReasoningItem)
and event.item.id
):
append_part(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We append together to :

  • history (Json version for OpenAI next call)
  • new_message (for Database)

Then we imediately delete from temp_stream_data --> so we know what needs to be added to the DB i case of stopping.

isinstance(event.item, ResponseOutputMessage)
and event.item.id
):
append_part(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

# Tool call (args) deltas
case ResponseFunctionCallArgumentsDeltaEvent() if event.item_id:
# Now call ID in the stream, we have to get it form temp stream data
tool_call_id = temp_stream_data["tool_calls"][
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason, OpenAI does not stream the call_id in the deltas.

Since the call_id is the only thing we have in the old DB schema before migration, and is the ONLY mandatory part in the OpenAi ReponseAPI to identify tool calls, I made the decision to keep the call_id as the main way to identify the tool call.

The frontend works only with this call_id, nothing else. Feel free to challenge that decision.

Comment on lines +437 to +444
append_part(
new_message, history, event.item, PartType.FUNCTION_CALL
)
# Tool call ready --> remove from tool_calls, add to tool_to_execute
temp_stream_data["tool_calls"].pop(event.item.id, None)
temp_stream_data["tool_to_execute"][event.item.id] = (
event.item
)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is how I track the pending tool calls. I add it to history and new_message remove it from the tool_calls and then put them in tool_calls_to_execute

Here since they are new messages that come from the response API, we can use the item.id to refer to it. Same is valid everytime I assign something to the temp_stream_data.

Comment on lines +453 to +455
# case _:
# print(event.type)
# Some events are not needed. Not sure what we should do with them yet.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some cases were not taken into account since they where duplicates. For example the "True Reasoning" and not the summaries may appear in other event types. Since OpenAI does not show them I am not sure how to test it ...

}
yield f"d:{json.dumps(done_data)}\n"
# End of agent loop. Add new message to DB.
messages.append(new_message)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

history and new_message are updated during the loop. We just need to append it to the DB messages list.

except asyncio.exceptions.CancelledError:
if isinstance(message["tool_calls"], defaultdict):
message["tool_calls"] = list(message.get("tool_calls", {}).values())
# add parts not appended to `new_message`
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We try to append the missing parts to new_messages with is_complete = False. Then we simply append the new_message and the background task will append it to the DB.

# add parts not appended to `new_message`
if temp_stream_data["reasoning"]:
for reasoning_item in temp_stream_data["reasoning"].values():
del reasoning_item.id
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OpenAI hates lonely reasoning parts. If I do not remove the ID it is not happy ...

content=json.dumps(message),
tool_calls=tool_calls,
is_complete=False,
append_part(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We append incomplete tool calls + according tool_call_output so that OpenAI is happy.

@BoBer78 BoBer78 marked this pull request as ready for review December 9, 2025 15:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Refactor HIL - tool input modification not allowed

3 participants