Skip to content

feat: add workflow query endpoint for agent state inspection#173

Merged
gabrycina merged 4 commits intomainfrom
feat/workflow-query-endpoint
Mar 26, 2026
Merged

feat: add workflow query endpoint for agent state inspection#173
gabrycina merged 4 commits intomainfrom
feat/workflow-query-endpoint

Conversation

@gabrycina
Copy link
Copy Markdown
Contributor

@gabrycina gabrycina commented Mar 24, 2026

Summary

Adds GET /tasks/{task_id}/query/{query_name} endpoint that proxies Temporal workflow queries through the Agentex REST API.

Problem

When programmatically invoking agentic (Temporal) agents, callers have no way to know when the agent has finished processing a turn. The task status stays RUNNING throughout. Agents using the state machine SDK internally track their state (waiting_for_input, researching, etc.) but this isn't exposed externally.

Solution

Expose Temporal's built-in workflow query API through the existing REST API. Agents that register @workflow.query handlers can now be queried for their current state without affecting execution.

New endpoint

GET /tasks/{task_id}/query/{query_name}
→ {"task_id": "...", "query": "get_current_state", "result": "waiting_for_input"}

Changes

  • Added query route to tasks.py using existing DTemporalAdapter dependency
  • Changed TemporalQueryError from 500 to 400 (invalid query is a client error)

Companion change needed

Agents need to register @workflow.query handlers to be queryable. The state machine SDK should add a default get_current_state query handler — tracked separately in the SDK repo.

Use case

The Agent Plane communication service (meta-registry-comms-svc) invokes agents via RPC and polls for responses. With this endpoint, it can check get_current_state to detect when the agent transitions back to waiting_for_input, providing a reliable turn-completion signal for multi-turn conversations.

Follows the same pattern as Google A2A protocol's INPUT_REQUIRED task state.

🤖 Generated with Claude Code

Greptile Summary

Adds a GET /tasks/{task_id}/query/{query_name} endpoint to proxy Temporal workflow queries through the REST API, enabling callers to inspect agent state (e.g., waiting_for_input) without affecting execution. Also fixes a bug in TemporalAdapter.query_workflow where None was incorrectly passed as an argument to the Temporal SDK's handle.query().\n\n- The new route bypasses the domain use case layer and calls DTemporalAdapter directly from the API layer — this is the only route in src/api/routes/ that does so, deviating from the documented clean architecture.\n- The endpoint does not validate that the task exists before querying Temporal, which could lead to querying unrelated workflows.\n- The None-arg fix in the adapter is correct and prevents a runtime error when queries don't require arguments.

Confidence Score: 3/5

Functional but has architectural concerns and a missing task existence check that should be addressed before merge.

The adapter fix is solid, but the new endpoint skips task existence validation (allowing queries against potentially unrelated Temporal workflows) and breaks the established layered architecture by calling an adapter directly from the route layer.

Pay close attention to agentex/src/api/routes/tasks.py — the new endpoint should go through a use case and validate task existence.

Important Files Changed

Filename Overview
agentex/src/api/routes/tasks.py Adds new GET /{task_id}/query/{query_name} endpoint. Bypasses the domain use case layer by calling DTemporalAdapter directly, and does not validate that the task exists before querying Temporal.
agentex/src/adapters/temporal/adapter_temporal.py Fixes a bug where None was passed as an argument to Temporal handle.query() — now conditionally omits the arg parameter when it's None.

Sequence Diagram

sequenceDiagram
    participant Client
    participant TasksRoute as tasks.py Route
    participant TemporalAdapter as TemporalAdapter
    participant Temporal as Temporal Server
    participant Workflow as Agent Workflow

    Client->>TasksRoute: GET /tasks/{task_id}/query/{query_name}
    Note over TasksRoute: Auth check via DAuthorizedId
    TasksRoute->>TemporalAdapter: query_workflow(workflow_id=task_id, query=query_name)
    TemporalAdapter->>Temporal: get_workflow_handle(task_id)
    TemporalAdapter->>Temporal: handle.query(query_name)
    Temporal->>Workflow: Execute @workflow.query handler
    Workflow-->>Temporal: Return state (e.g., "waiting_for_input")
    Temporal-->>TemporalAdapter: Query result
    TemporalAdapter-->>TasksRoute: result
    TasksRoute-->>Client: {"task_id": "...", "query": "...", "result": "..."}
Loading
Prompt To Fix All With AI
This is a comment left during a code review.
Path: agentex/src/api/routes/tasks.py
Line: 229

Comment:
**Route bypasses domain layer, calling adapter directly**

Every other endpoint in this file delegates to a use case (`DTaskUseCase`, `DStreamsUseCase`), following the documented architecture: "API layer handles HTTP concerns, delegates to use cases" and "Dependencies flow inward (API → Domain ← Adapters)" (from `CLAUDE.md`).

This endpoint injects `DTemporalAdapter` directly into the route, which is the only place in `src/api/routes/` where an adapter is imported and called from the API layer (confirmed by grepping — `deployment_history.py` and `agents.py` only import adapter *exceptions*, not the adapter itself).

This matters because:
- It couples the HTTP layer to the Temporal infrastructure, making it harder to swap or mock the adapter
- It skips any domain-level validation (e.g., verifying the task exists before querying its workflow)
- It sets a precedent that erodes the layered architecture

Consider creating a method on `DTaskUseCase` (e.g., `query_task_workflow(task_id, query_name)`) that validates the task exists and then delegates to the temporal adapter.

**Context Used:** CLAUDE.md ([source](https://app.greptile.com/review/custom-context?memory=54e85549-5e1a-4d52-b6ac-c8e71f4ea0e6))

How can I resolve this? If you propose a fix, please make it concise.

---

This is a comment left during a code review.
Path: agentex/src/api/routes/tasks.py
Line: 235-238

Comment:
**No validation that the task exists before querying Temporal**

The endpoint passes `task_id` directly as `workflow_id` to Temporal without first checking that a task with this ID exists in the database. If a caller provides a valid-format UUID that doesn't correspond to a task (but does match a Temporal workflow started by another system, e.g., a healthcheck workflow), they'd get a result from the wrong workflow.

Other endpoints in this file first load the task via `task_use_case.get_task(id=task_id)` which returns a 404 if the task doesn't exist. This endpoint should do the same to maintain consistency and prevent querying unrelated workflows.

How can I resolve this? If you propose a fix, please make it concise.

Reviews (4): Last reviewed commit: "Merge branch 'main' into feat/workflow-q..." | Re-trigger Greptile

Greptile also left 1 inline comment on this PR.

Context used:

  • Context used - CLAUDE.md (source)

…al workflow queries

Expose Temporal workflow queries via the tasks REST API. The endpoint
delegates to the existing TemporalAdapter.query_workflow method and
relies on the global exception handler for error mapping:
- TemporalWorkflowNotFoundError → 404
- TemporalQueryError → 400 (changed from 500 to reflect client error)
- Other TemporalError subclasses → 500

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@gabrycina gabrycina requested a review from a team as a code owner March 24, 2026 19:35
gabrycina added a commit to scaleapi/scale-agentex-python that referenced this pull request Mar 24, 2026
Adds a default @workflow.query handler named "get_current_state"
to BaseWorkflow. Returns a _workflow_state string that defaults
to "initialized".

Agents using StateMachine should override this to return the
state machine's current state, enabling external callers to
detect turn completion (state == "waiting_for_input").

Example override:

    @workflow.query(name="get_current_state")
    def get_current_state(self) -> str:
        return self.state_machine.get_current_state()

Companion to scaleapi/scale-agentex#173 which adds
GET /tasks/{task_id}/query/{query_name} endpoint.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
gabrycina added a commit to scaleapi/scale-agentex-python that referenced this pull request Mar 24, 2026
Adds a default @workflow.query handler named "get_current_state"
to BaseWorkflow. Returns a _workflow_state string that defaults
to "initialized".

Agents using StateMachine should override this to return the
state machine's current state, enabling external callers to
detect turn completion (state == "waiting_for_input").

Example override:

    @workflow.query(name="get_current_state")
    def get_current_state(self) -> str:
        return self.state_machine.get_current_state()

Companion to scaleapi/scale-agentex#173 which adds
GET /tasks/{task_id}/query/{query_name} endpoint.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Greptile correctly flagged that TemporalQueryError is used as a
catch-all in the adapter, not just for missing query handlers.
Keeping it as 500 (ServiceError) is correct since connectivity,
serialization, and other server-side failures also raise this.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
gabrycina added a commit to scaleapi/scale-agentex-python that referenced this pull request Mar 24, 2026
Adds a default @workflow.query handler named "get_current_state"
to BaseWorkflow. Automatically returns the state machine's current
state if self.state_machine exists (covers all StateMachine-based
agents without any override needed). Falls back to _workflow_state
string for non-state-machine agents.

Enables external callers to detect turn completion:
  "waiting_for_input" = agent is done, ready for next message
  "researching" = agent is still working

Companion to scaleapi/scale-agentex#173 which adds
GET /tasks/{task_id}/query/{query_name} endpoint.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
gabrycina added a commit to scaleapi/scale-agentex-python that referenced this pull request Mar 24, 2026
Adds a default @workflow.query handler named "get_current_state"
to BaseWorkflow. Returns "unknown" by default — agents override
to report their actual state.

Example for StateMachine-based agents:

    @workflow.query(name="get_current_state")
    def get_current_state(self) -> str:
        return self.state_machine.get_current_state()

Companion to scaleapi/scale-agentex#173 which adds
GET /tasks/{task_id}/query/{query_name} endpoint.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
handle.query(name, None) passes None as a positional argument
to the query handler, causing "takes 1 positional argument but
2 were given" for handlers that take no args.

Only pass arg when it's not None.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>


@router.get(
"/{task_id}/query/{query_name}",
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add tests for this new route?

Comment on lines +235 to +238
result = await temporal_adapter.query_workflow(
workflow_id=task_id,
query=query_name,
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 No validation that the task exists before querying Temporal

The endpoint passes task_id directly as workflow_id to Temporal without first checking that a task with this ID exists in the database. If a caller provides a valid-format UUID that doesn't correspond to a task (but does match a Temporal workflow started by another system, e.g., a healthcheck workflow), they'd get a result from the wrong workflow.

Other endpoints in this file first load the task via task_use_case.get_task(id=task_id) which returns a 404 if the task doesn't exist. This endpoint should do the same to maintain consistency and prevent querying unrelated workflows.

Prompt To Fix With AI
This is a comment left during a code review.
Path: agentex/src/api/routes/tasks.py
Line: 235-238

Comment:
**No validation that the task exists before querying Temporal**

The endpoint passes `task_id` directly as `workflow_id` to Temporal without first checking that a task with this ID exists in the database. If a caller provides a valid-format UUID that doesn't correspond to a task (but does match a Temporal workflow started by another system, e.g., a healthcheck workflow), they'd get a result from the wrong workflow.

Other endpoints in this file first load the task via `task_use_case.get_task(id=task_id)` which returns a 404 if the task doesn't exist. This endpoint should do the same to maintain consistency and prevent querying unrelated workflows.

How can I resolve this? If you propose a fix, please make it concise.

@gabrycina gabrycina merged commit 9a5fd64 into main Mar 26, 2026
28 checks passed
@gabrycina gabrycina deleted the feat/workflow-query-endpoint branch March 26, 2026 13:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants