feat: add workflow query endpoint for agent state inspection#173
feat: add workflow query endpoint for agent state inspection#173
Conversation
…al workflow queries Expose Temporal workflow queries via the tasks REST API. The endpoint delegates to the existing TemporalAdapter.query_workflow method and relies on the global exception handler for error mapping: - TemporalWorkflowNotFoundError → 404 - TemporalQueryError → 400 (changed from 500 to reflect client error) - Other TemporalError subclasses → 500 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Adds a default @workflow.query handler named "get_current_state"
to BaseWorkflow. Returns a _workflow_state string that defaults
to "initialized".
Agents using StateMachine should override this to return the
state machine's current state, enabling external callers to
detect turn completion (state == "waiting_for_input").
Example override:
@workflow.query(name="get_current_state")
def get_current_state(self) -> str:
return self.state_machine.get_current_state()
Companion to scaleapi/scale-agentex#173 which adds
GET /tasks/{task_id}/query/{query_name} endpoint.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Adds a default @workflow.query handler named "get_current_state"
to BaseWorkflow. Returns a _workflow_state string that defaults
to "initialized".
Agents using StateMachine should override this to return the
state machine's current state, enabling external callers to
detect turn completion (state == "waiting_for_input").
Example override:
@workflow.query(name="get_current_state")
def get_current_state(self) -> str:
return self.state_machine.get_current_state()
Companion to scaleapi/scale-agentex#173 which adds
GET /tasks/{task_id}/query/{query_name} endpoint.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Greptile correctly flagged that TemporalQueryError is used as a catch-all in the adapter, not just for missing query handlers. Keeping it as 500 (ServiceError) is correct since connectivity, serialization, and other server-side failures also raise this. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Adds a default @workflow.query handler named "get_current_state" to BaseWorkflow. Automatically returns the state machine's current state if self.state_machine exists (covers all StateMachine-based agents without any override needed). Falls back to _workflow_state string for non-state-machine agents. Enables external callers to detect turn completion: "waiting_for_input" = agent is done, ready for next message "researching" = agent is still working Companion to scaleapi/scale-agentex#173 which adds GET /tasks/{task_id}/query/{query_name} endpoint. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Adds a default @workflow.query handler named "get_current_state"
to BaseWorkflow. Returns "unknown" by default — agents override
to report their actual state.
Example for StateMachine-based agents:
@workflow.query(name="get_current_state")
def get_current_state(self) -> str:
return self.state_machine.get_current_state()
Companion to scaleapi/scale-agentex#173 which adds
GET /tasks/{task_id}/query/{query_name} endpoint.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
handle.query(name, None) passes None as a positional argument to the query handler, causing "takes 1 positional argument but 2 were given" for handlers that take no args. Only pass arg when it's not None. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
|
||
|
|
||
| @router.get( | ||
| "/{task_id}/query/{query_name}", |
There was a problem hiding this comment.
Can we add tests for this new route?
| result = await temporal_adapter.query_workflow( | ||
| workflow_id=task_id, | ||
| query=query_name, | ||
| ) |
There was a problem hiding this comment.
No validation that the task exists before querying Temporal
The endpoint passes task_id directly as workflow_id to Temporal without first checking that a task with this ID exists in the database. If a caller provides a valid-format UUID that doesn't correspond to a task (but does match a Temporal workflow started by another system, e.g., a healthcheck workflow), they'd get a result from the wrong workflow.
Other endpoints in this file first load the task via task_use_case.get_task(id=task_id) which returns a 404 if the task doesn't exist. This endpoint should do the same to maintain consistency and prevent querying unrelated workflows.
Prompt To Fix With AI
This is a comment left during a code review.
Path: agentex/src/api/routes/tasks.py
Line: 235-238
Comment:
**No validation that the task exists before querying Temporal**
The endpoint passes `task_id` directly as `workflow_id` to Temporal without first checking that a task with this ID exists in the database. If a caller provides a valid-format UUID that doesn't correspond to a task (but does match a Temporal workflow started by another system, e.g., a healthcheck workflow), they'd get a result from the wrong workflow.
Other endpoints in this file first load the task via `task_use_case.get_task(id=task_id)` which returns a 404 if the task doesn't exist. This endpoint should do the same to maintain consistency and prevent querying unrelated workflows.
How can I resolve this? If you propose a fix, please make it concise.
Summary
Adds
GET /tasks/{task_id}/query/{query_name}endpoint that proxies Temporal workflow queries through the Agentex REST API.Problem
When programmatically invoking agentic (Temporal) agents, callers have no way to know when the agent has finished processing a turn. The task status stays
RUNNINGthroughout. Agents using the state machine SDK internally track their state (waiting_for_input,researching, etc.) but this isn't exposed externally.Solution
Expose Temporal's built-in workflow query API through the existing REST API. Agents that register
@workflow.queryhandlers can now be queried for their current state without affecting execution.New endpoint
Changes
tasks.pyusing existingDTemporalAdapterdependencyTemporalQueryErrorfrom 500 to 400 (invalid query is a client error)Companion change needed
Agents need to register
@workflow.queryhandlers to be queryable. The state machine SDK should add a defaultget_current_statequery handler — tracked separately in the SDK repo.Use case
The Agent Plane communication service (meta-registry-comms-svc) invokes agents via RPC and polls for responses. With this endpoint, it can check
get_current_stateto detect when the agent transitions back towaiting_for_input, providing a reliable turn-completion signal for multi-turn conversations.Follows the same pattern as Google A2A protocol's
INPUT_REQUIREDtask state.🤖 Generated with Claude Code
Greptile Summary
Adds a
GET /tasks/{task_id}/query/{query_name}endpoint to proxy Temporal workflow queries through the REST API, enabling callers to inspect agent state (e.g.,waiting_for_input) without affecting execution. Also fixes a bug inTemporalAdapter.query_workflowwhereNonewas incorrectly passed as an argument to the Temporal SDK'shandle.query().\n\n- The new route bypasses the domain use case layer and callsDTemporalAdapterdirectly from the API layer — this is the only route insrc/api/routes/that does so, deviating from the documented clean architecture.\n- The endpoint does not validate that the task exists before querying Temporal, which could lead to querying unrelated workflows.\n- TheNone-arg fix in the adapter is correct and prevents a runtime error when queries don't require arguments.Confidence Score: 3/5
Functional but has architectural concerns and a missing task existence check that should be addressed before merge.
The adapter fix is solid, but the new endpoint skips task existence validation (allowing queries against potentially unrelated Temporal workflows) and breaks the established layered architecture by calling an adapter directly from the route layer.
Pay close attention to agentex/src/api/routes/tasks.py — the new endpoint should go through a use case and validate task existence.
Important Files Changed
Sequence Diagram
sequenceDiagram participant Client participant TasksRoute as tasks.py Route participant TemporalAdapter as TemporalAdapter participant Temporal as Temporal Server participant Workflow as Agent Workflow Client->>TasksRoute: GET /tasks/{task_id}/query/{query_name} Note over TasksRoute: Auth check via DAuthorizedId TasksRoute->>TemporalAdapter: query_workflow(workflow_id=task_id, query=query_name) TemporalAdapter->>Temporal: get_workflow_handle(task_id) TemporalAdapter->>Temporal: handle.query(query_name) Temporal->>Workflow: Execute @workflow.query handler Workflow-->>Temporal: Return state (e.g., "waiting_for_input") Temporal-->>TemporalAdapter: Query result TemporalAdapter-->>TasksRoute: result TasksRoute-->>Client: {"task_id": "...", "query": "...", "result": "..."}Prompt To Fix All With AI
Reviews (4): Last reviewed commit: "Merge branch 'main' into feat/workflow-q..." | Re-trigger Greptile
Context used: