-
Notifications
You must be signed in to change notification settings - Fork 1
Vercel v5 #557
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
… Need to still adjust to Vercel new types
| class Parts(Base): | ||
| """SQL table for storing Response API parts (JSONB format).""" | ||
|
|
||
| __tablename__ = "parts" | ||
| part_id: Mapped[uuid.UUID] = mapped_column( | ||
| UUID, primary_key=True, default=lambda: uuid.uuid4() | ||
| ) | ||
| message_id: Mapped[uuid.UUID] = mapped_column( | ||
| UUID, ForeignKey("messages.message_id") | ||
| UUID, ForeignKey("messages.message_id"), nullable=False | ||
| ) | ||
| message: Mapped[Messages] = relationship("Messages", back_populates="tool_calls") | ||
| order_index: Mapped[int] = mapped_column(Integer, nullable=False) | ||
| type: Mapped[PartType] = mapped_column(Enum(PartType), nullable=False) | ||
| output: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False) | ||
| is_complete: Mapped[bool] = mapped_column(Boolean, nullable=False) | ||
| validated: Mapped[bool] = mapped_column(Boolean, nullable=True) | ||
|
|
||
| message: Mapped[Messages] = relationship("Messages", back_populates="parts") | ||
|
|
||
| __table_args__ = (Index("ix_parts_message_id", "message_id"),) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I decided to only keep the necessary in the table. output is the Raw OpenAI output. This is slightly annoying because if we want to get to the content of a message, it is nested quite deeply.
| WebSearchTool, | ||
| # NowTool, | ||
| # WeatherTool, | ||
| WeatherTool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will remove it. Left it for review.
| return {"input_cached": None, "input_noncached": None, "completion": None} | ||
|
|
||
|
|
||
| def append_part( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I defined a couple functions here to make AgentRoutine more readable. Feel free to protest.
| setMessages((prevState) => { | ||
| prevState[prevState.length - 1] = { | ||
| ...prevState[prevState.length - 1], | ||
| isComplete: false, | ||
| }; | ||
| // We only change the metadata at message level and keep the rest. | ||
| return prevState; | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might need to be revisited.
| }; | ||
| }, | ||
| messages: retrievedMessages, | ||
| experimental_throttle: 50, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This eliminates all infintie re-render bugs. Else they are very frequent. Except while doing window switching while the chat was streaming, I never encountered the bug again.
One thing to note, now the infinite re-render bug is caught by our error handler and stops the chat.
| }, | ||
| messages: retrievedMessages, | ||
| experimental_throttle: 50, | ||
| sendAutomaticallyWhen: lastAssistantHasAllToolOutputs, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now we have to specify when we want the agent to send a message automatically.
| // Handle chat inputs. | ||
| const [input, setInput] = useState(""); | ||
| const handleSubmit = ( | ||
| e: React.FormEvent<HTMLFormElement | HTMLTextAreaElement>, | ||
| ) => { | ||
| e.preventDefault(); | ||
| sendMessage({ text: input }); | ||
| setInput(""); | ||
| }; | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inputs change and submit is now our responsibility, not Vercel
| // Handle streaming interruption | ||
| useEffect(() => { | ||
| if (stopped) { | ||
| setMessages((prevState) => { | ||
| prevState[prevState.length - 1] = { | ||
| ...prevState[prevState.length - 1], | ||
| annotations: prevState | ||
| .at(-1) | ||
| ?.annotations?.map((ann) => | ||
| !ann.toolCallId ? { isComplete: false } : ann, | ||
| ), | ||
| }; | ||
| // We only change the annotation at message level and keep the rest. | ||
| return prevState; | ||
| }); | ||
| } | ||
| }, [stopped, setMessages]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might want to revisit that ... it works for now.
| useEffect(() => { | ||
| if (isInvalidating || isFetching) return; | ||
| // Set retrieved DB messaged as current messages | ||
| if (!stopped) { | ||
| setMessages(() => [ | ||
| ...retrievedMessages, | ||
| ...messages.filter( | ||
| (m) => m.id.length !== 36 && !m.id.startsWith("temp"), | ||
| (m) => m.id.length !== 36 && !m.id.startsWith("msg"), | ||
| ), | ||
| ]); | ||
| } else { | ||
| setMessages(retrievedMessages); | ||
| } | ||
| }, [md5(JSON.stringify(retrievedMessages))]); // Rerun on content change | ||
| // eslint-disable-next-line react-hooks/exhaustive-deps | ||
| }, [isInvalidating, isFetching]); // RE-run on new fetching or stop |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to get rid of this. In my testing it works fine. But please do check again.
|
|
||
| // Observer to fetch new pages : | ||
| useEffect(() => { | ||
| const container = containerRef.current; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did some cleaning.
| root: scrollContainer, | ||
| rootMargin: "100px", | ||
| threshold: 0.1, | ||
| }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without that there was a bug. Sometimes new threads where not fetched.
| export const env = { | ||
| // Server | ||
| SERVER_SIDE_BACKEND_URL: process.env.SERVER_SIDE_BACKEND_URL, | ||
| NEXTAUTH_SECRET: process.env.NEXTAUTH_SECRET, | ||
| KEYCLOAK_ID: process.env.KEYCLOAK_ID, | ||
| KEYCLOAK_SECRET: process.env.KEYCLOAK_SECRET, | ||
| KEYCLOAK_ISSUER: process.env.KEYCLOAK_ISSUER, | ||
| // Client | ||
| NEXT_PUBLIC_BACKEND_URL: process.env.NEXT_PUBLIC_BACKEND_URL, | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was some compatibility issue with the new Vercel and another package that was super old. I did that as one of the first step of the PR. This is probably bad. Did not have much issue when tesing.
| async def execute_tool_calls( | ||
| self, | ||
| tool_calls: list[ToolCalls], | ||
| tool_calls: list[ResponseFunctionToolCall], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I switched everything to the OpenAI types in the AgentRoutine.
| await new_message.awaitable_attrs.token_consumption | ||
| # Initialize metadata for previous HIL tool calls | ||
| tool_map = {tool.name: tool for tool in active_agent.tools} | ||
| metadata_data = get_previous_hil_metadata(new_message, tool_map) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did that to make the streamed vercel and the messages from the DB as close as possible to make handling everything in the frontend easier.
| and choice.delta.reasoning | ||
| usage_data = None | ||
| # for streaming interrupt and handling tool calls | ||
| temp_stream_data: dict[str, Any] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now OpenAI gives us the whole part after streaming one. They do the concatenation on their side. We just need to keep the temporary data if the user stops.
IMPORTANT : this acts as the way to store the tool call that need to be executed after the stream finishes.
| isinstance(event.item, ResponseReasoningItem) | ||
| and event.item.id | ||
| ): | ||
| append_part( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We append together to :
- history (Json version for OpenAI next call)
- new_message (for Database)
Then we imediately delete from temp_stream_data --> so we know what needs to be added to the DB i case of stopping.
| isinstance(event.item, ResponseOutputMessage) | ||
| and event.item.id | ||
| ): | ||
| append_part( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
| # Tool call (args) deltas | ||
| case ResponseFunctionCallArgumentsDeltaEvent() if event.item_id: | ||
| # Now call ID in the stream, we have to get it form temp stream data | ||
| tool_call_id = temp_stream_data["tool_calls"][ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For some reason, OpenAI does not stream the call_id in the deltas.
Since the call_id is the only thing we have in the old DB schema before migration, and is the ONLY mandatory part in the OpenAi ReponseAPI to identify tool calls, I made the decision to keep the call_id as the main way to identify the tool call.
The frontend works only with this call_id, nothing else. Feel free to challenge that decision.
| append_part( | ||
| new_message, history, event.item, PartType.FUNCTION_CALL | ||
| ) | ||
| # Tool call ready --> remove from tool_calls, add to tool_to_execute | ||
| temp_stream_data["tool_calls"].pop(event.item.id, None) | ||
| temp_stream_data["tool_to_execute"][event.item.id] = ( | ||
| event.item | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is how I track the pending tool calls. I add it to history and new_message remove it from the tool_calls and then put them in tool_calls_to_execute
Here since they are new messages that come from the response API, we can use the item.id to refer to it. Same is valid everytime I assign something to the temp_stream_data.
| # case _: | ||
| # print(event.type) | ||
| # Some events are not needed. Not sure what we should do with them yet. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some cases were not taken into account since they where duplicates. For example the "True Reasoning" and not the summaries may appear in other event types. Since OpenAI does not show them I am not sure how to test it ...
| } | ||
| yield f"d:{json.dumps(done_data)}\n" | ||
| # End of agent loop. Add new message to DB. | ||
| messages.append(new_message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
history and new_message are updated during the loop. We just need to append it to the DB messages list.
| except asyncio.exceptions.CancelledError: | ||
| if isinstance(message["tool_calls"], defaultdict): | ||
| message["tool_calls"] = list(message.get("tool_calls", {}).values()) | ||
| # add parts not appended to `new_message` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We try to append the missing parts to new_messages with is_complete = False. Then we simply append the new_message and the background task will append it to the DB.
| # add parts not appended to `new_message` | ||
| if temp_stream_data["reasoning"]: | ||
| for reasoning_item in temp_stream_data["reasoning"].values(): | ||
| del reasoning_item.id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OpenAI hates lonely reasoning parts. If I do not remove the ID it is not happy ...
| content=json.dumps(message), | ||
| tool_calls=tool_calls, | ||
| is_complete=False, | ||
| append_part( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We append incomplete tool calls + according tool_call_output so that OpenAI is happy.
Closes #580 #566 #352
Main things
MessagesDB schema is now completely different --> mimics the ResponseAPI.DB Schema change
IMPORTANT : if you want to backup your local DB before running
alembic upgrade, I used :I only kept the mandatory fields so that the we can send the messages back to OpenAI. Some non mandatory info is lost in the process.
The DB messages go from
User,AI_TOOL,TOOLandAI_MESSAGEto onlyUserandAssistant. Everything is then stored in thePartsof the messages :Usermessages only have oneMessagepartAssistantcan haveReasoning,Tool_call,Tool_call_outputandMessageparts (Any number in any order)One
Assistantmessage now correspond to one full turn of the agent loop in AgentRoutine.The most complex part of the PR lies the migration script.
I tested extensively. You can write some messages, upgrade to new schema, write some more messages, downgrade, and repeat 10 times. It works. I did not test all the weird edge case like for example stopping a HIL message at a very specific timing.
Reponse API change
IMPORTANT : The response API of OpenRouter is still in Beta. When not streaming, it works fine, but they send the chunks in a different order sometimes and the main LLM is not compatible with it for now.
For non streamed responses or parsed structured output it works fine.
With the new change DB schema, the only thing we have to do is :
https://github.com/openbraininstitute/neuroagent/blob/f6250e7cb16586d049e05bb9e5b72ea08b23c1a0/backend/src/neuroagent/utils.py#L28C1-L40C27
And we can send it to OpenAI.
AgentRoutine Change
I made everything work with the pydantic schemas from OpenAI. So now there is no random dict, everything is an OpenAI type. One very conveniant change is that OpenAI now does the concatenation on their side. Once a
Partis finished streaming, OpenAI sends us a chunk with the complete part.neuroagent/backend/src/neuroagent/agent_routine.py
Line 293 in f6250e7
The new parts are appended (when they are complete) both in the history and the new_message with this function :
https://github.com/openbraininstitute/neuroagent/blob/f6250e7cb16586d049e05bb9e5b72ea08b23c1a0/backend/src/neuroagent/utils.py#L250C1-L272C27
Vercel v5
A lot of small things were changed. The main annoyance was the the way the chunks were streamed changed completely.
But with the switch to the response API, one vercel chunk == one Response chunk so it was not that complicated.