Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion src/uipath_langchain/agent/tools/mcp/mcp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,15 @@
from mcp import ClientSession
from mcp.shared.exceptions import McpError
from mcp.shared.message import SessionMessage
from mcp.types import CallToolResult, ListToolsResult
from mcp.types import (
CallToolRequest,
CallToolRequestParams,
CallToolResult,
ClientRequest,
CreateTaskResult,
ListToolsResult,
TaskMetadata,
)
from uipath._utils._ssl_context import get_httpx_client_kwargs
from uipath.runtime.base import UiPathDisposableProtocol

Expand Down Expand Up @@ -366,6 +374,39 @@ async def call_tool(
f"call_tool({name})",
)

async def call_tool_as_task(
self,
name: str,
arguments: dict[str, Any] | None = None,
) -> CreateTaskResult:
"""Call an MCP tool as a task (2025-11-25), returning a task handle.

Sends a task-augmented ``tools/call`` (``params.task``); a task-supporting
server responds with a ``CreateTaskResult`` (a handle to poll/await/drive)
instead of blocking for the tool result. The Python SDK has no client task
helper, so this sends the raw request via ``send_request``.

Args:
name: The name of the tool to call.
arguments: Optional arguments to pass to the tool.

Returns:
The ``CreateTaskResult`` returned by the server.
"""
return await self._execute_with_retry(
lambda session: session.send_request(
ClientRequest(
CallToolRequest(
params=CallToolRequestParams(
name=name, arguments=arguments, task=TaskMetadata()
)
)
),
CreateTaskResult,
),
f"call_tool_as_task({name})",
)

async def dispose(self) -> None:
"""Dispose of the client and release all resources.

Expand Down
115 changes: 111 additions & 4 deletions src/uipath_langchain/agent/tools/mcp/mcp_tool.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import logging
from contextlib import AsyncExitStack, asynccontextmanager
from typing import Any, AsyncGenerator
Expand All @@ -6,11 +7,21 @@
from uipath.agent.models.agent import (
AgentMcpResourceConfig,
AgentMcpTool,
AgentMcpToolExecution,
CachedToolsConfig,
Comment on lines 7 to 11
DynamicToolsConfig,
McpToolTaskSupport,
)
from uipath.eval.mocks import mockable

from uipath.platform import UiPath
from uipath.platform.common import WaitJobRaw
from uipath.platform.orchestrator import Job, JobState

from uipath_langchain._utils.durable_interrupt import durable_interrupt
from uipath_langchain.agent.exceptions import (
AgentRuntimeError,
AgentRuntimeErrorCode,
)
from uipath_langchain.agent.tools.structured_tool_with_argument_properties import (
StructuredToolWithArgumentProperties,
)
Expand All @@ -20,6 +31,32 @@

logger: logging.Logger = logging.getLogger(__name__)

# _meta keys AgentHubService stamps on a task result to mark it as a UiPath job (see PR adding
# uipath.com/* markers). The MCP client reads them to drive the job as a suspendable child job.
_UIPATH_SOURCE_META_KEY = "uipath.com/source"
_UIPATH_JOB_KEY_META_KEY = "uipath.com/jobKey"
_UIPATH_FOLDER_KEY_META_KEY = "uipath.com/folderKey"
_UIPATH_ORCHESTRATOR_SOURCE = "orchestrator"


def _execution_from_server_tool(tool: Any) -> AgentMcpToolExecution | None:
"""Map an MCP server Tool's ``execution.taskSupport`` into the snapshot model (dynamic mode)."""
execution = getattr(tool, "execution", None)
task_support = getattr(execution, "taskSupport", None) if execution else None
if task_support is None:
return None
value = getattr(task_support, "value", task_support)
return AgentMcpToolExecution(task_support=McpToolTaskSupport(value))


def _is_task_augmentable(mcp_tool: AgentMcpTool) -> bool:
"""Whether the tool advertises MCP task support (``optional`` / ``required``)."""
execution = getattr(mcp_tool, "execution", None)
return execution is not None and execution.task_support in (
McpToolTaskSupport.OPTIONAL,
McpToolTaskSupport.REQUIRED,
)


@asynccontextmanager
async def open_mcp_tools(
Expand Down Expand Up @@ -118,6 +155,7 @@ async def create_mcp_tools(
input_schema=tool.inputSchema,
output_schema=tool.outputSchema,
argument_properties=argument_properties,
execution=_execution_from_server_tool(tool),
)
)
else:
Expand Down Expand Up @@ -155,18 +193,24 @@ def build_mcp_tool(mcp_tool: AgentMcpTool, mcpClient: McpClient) -> Any:
else:
output_schema = {"type": "object", "properties": {}}

task_augmentable = _is_task_augmentable(mcp_tool)

@mockable(
name=mcp_tool.name,
description=mcp_tool.description,
input_schema=mcp_tool.input_schema,
output_schema=output_schema,
)
async def tool_fn(**kwargs: Any) -> Any:
"""Execute MCP tool call with ephemeral session.
"""Execute an MCP tool call with an ephemeral session.

If a session disconnect error occurs (e.g., 404 or session terminated),
the tool will retry once by re-initializing the session.
When the tool supports MCP tasks, the call starts a UiPath job and suspends the
agent until it completes (see :func:`_invoke_mcp_tool_as_job`). Otherwise the tool
is called synchronously; a session disconnect (404) retries once.
"""
if task_augmentable:
return await _invoke_mcp_tool_as_job(mcp_tool, mcpClient, kwargs)

result = await mcpClient.call_tool(mcp_tool.name, arguments=kwargs)
logger.info(f"Tool call successful for {mcp_tool.name}")
content = result.content if hasattr(result, "content") else result
Expand All @@ -184,6 +228,69 @@ async def tool_fn(**kwargs: Any) -> Any:
return tool_fn


async def _invoke_mcp_tool_as_job(
mcp_tool: AgentMcpTool,
mcpClient: McpClient,
arguments: dict[str, Any],
) -> Any:
"""Call a task-supporting MCP tool and suspend the agent job until the child completes.

The task-augmented ``tools/call`` returns a ``CreateTaskResult`` whose ``_meta`` marks it
as a UiPath Orchestrator job. We then ``interrupt`` with a ``WaitJobRaw`` (exactly like
:func:`process_tool.create_process_tool`), so the runtime suspends the parent job and
resumes it with the child job's output when it finishes.

Args:
mcp_tool: The MCP tool being invoked.
mcpClient: The client used to start the task.
arguments: The tool-call arguments.

Returns:
The completed child job's output (parsed JSON when possible).
"""

@durable_interrupt
async def start_mcp_job():
create_result = await mcpClient.call_tool_as_task(
mcp_tool.name, arguments=arguments
)
meta = create_result.meta or {}
if meta.get(_UIPATH_SOURCE_META_KEY) != _UIPATH_ORCHESTRATOR_SOURCE:
raise AgentRuntimeError(
code=AgentRuntimeErrorCode.UNEXPECTED_ERROR,
title=f"Tool '{mcp_tool.name}' did not start a UiPath job",
detail=(
"The MCP server returned a task that is not a UiPath Orchestrator job "
"(missing the uipath.com/source marker), which is not supported."
),
)

return WaitJobRaw(
# The resume trigger keys off the job's GUID key (item_key = job.key) and re-fetches the
# job on resume; the numeric id is required by the model but unused here, hence the 0.
job=Job(
id=0,
key=meta.get(_UIPATH_JOB_KEY_META_KEY),
folder_key=meta.get(_UIPATH_FOLDER_KEY_META_KEY),
),
process_folder_key=meta.get(_UIPATH_FOLDER_KEY_META_KEY),
)
Comment on lines +257 to +277

# First run: starts the job and suspends. On resume: returns the resolved Job.
job = await start_mcp_job()

if (getattr(job, "state", None) or "").lower() == JobState.FAULTED:
return str(getattr(job, "info", None) or "Unknown error")

output_str = await UiPath().jobs.extract_output_async(job)
if output_str:
try:
return json.loads(output_str)
except (json.JSONDecodeError, TypeError):
return output_str
return output_str


async def create_mcp_tools_and_clients(
resources: list[AgentMcpResourceConfig],
session_info_factory: SessionInfoFactory | None = None,
Expand Down
144 changes: 144 additions & 0 deletions tests/agent/tools/test_mcp/test_mcp_task_suspend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
"""Tests for suspend-on-UiPath-task (PR-B).

A task-supporting MCP tool, when called against a UiPath MCP server, returns a
CreateTaskResult whose _meta marks it as a UiPath job. The tool then interrupts with a
WaitJobRaw (like process_tool), suspending the parent agent job until the child completes.
"""

import datetime
from unittest.mock import AsyncMock, MagicMock, patch

from mcp.types import CreateTaskResult, Tool, ToolExecution
from uipath.agent.models.agent import AgentMcpTool, McpToolTaskSupport
from uipath.platform.common import WaitJobRaw

from uipath_langchain.agent.tools.mcp.mcp_client import McpClient
from uipath_langchain.agent.tools.mcp.mcp_tool import (
_execution_from_server_tool,
_is_task_augmentable,
build_mcp_tool,
)


def _mcp_tool(task_support: str | None) -> AgentMcpTool:
data: dict = {
"name": "invoke-process",
"description": "Run a process",
"inputSchema": {"type": "object", "properties": {}},
}
Comment on lines +23 to +28
if task_support is not None:
data["execution"] = {"taskSupport": task_support}
return AgentMcpTool.model_validate(data)


def _create_task_result(source: str = "orchestrator") -> CreateTaskResult:
now = datetime.datetime.now(datetime.timezone.utc).isoformat()
return CreateTaskResult.model_validate(
{
"task": {
"taskId": "job-key-1",
"status": "working",
"createdAt": now,
"lastUpdatedAt": now,
"ttl": 86_400_000,
},
"_meta": {
"uipath.com/source": source,
"uipath.com/jobKey": "job-key-1",
"uipath.com/folderKey": "folder-key-1",
},
}
)


class TestTaskAugmentableDetection:
def test_optional_and_required_are_augmentable(self) -> None:
assert _is_task_augmentable(_mcp_tool("optional")) is True
assert _is_task_augmentable(_mcp_tool("required")) is True

def test_forbidden_and_missing_are_not(self) -> None:
assert _is_task_augmentable(_mcp_tool("forbidden")) is False
assert _is_task_augmentable(_mcp_tool(None)) is False

def test_execution_mapped_from_server_tool(self) -> None:
tool = Tool(
name="p",
description="d",
inputSchema={"type": "object", "properties": {}},
execution=ToolExecution(taskSupport="optional"),
)
execution = _execution_from_server_tool(tool)
assert execution is not None
assert execution.task_support == McpToolTaskSupport.OPTIONAL

def test_execution_none_when_server_tool_has_no_execution(self) -> None:
tool = Tool(
name="p",
description="d",
inputSchema={"type": "object", "properties": {}},
)
assert _execution_from_server_tool(tool) is None


class TestCallToolAsTask:
async def test_sends_task_augmented_request(self) -> None:
client = McpClient(config=MagicMock())
session = MagicMock()
create_result = _create_task_result()
session.send_request = AsyncMock(return_value=create_result)
client._ensure_session = AsyncMock(return_value=session) # type: ignore[method-assign]
client._client_initialized = True

result = await client.call_tool_as_task("invoke-process", arguments={"a": 1})

assert result is create_result
sent_request = session.send_request.call_args[0][0]
call_tool_request = sent_request.root
assert call_tool_request.params.name == "invoke-process"
assert call_tool_request.params.task is not None


class TestSuspendOnUiPathTask:
@patch("uipath_langchain._utils.durable_interrupt.decorator.interrupt")
@patch("uipath_langchain.agent.tools.mcp.mcp_tool.UiPath")
async def test_task_tool_starts_job_and_suspends_with_waitjob(
self, mock_uipath: MagicMock, mock_interrupt: MagicMock
) -> None:
mcp_client = MagicMock()
mcp_client.call_tool_as_task = AsyncMock(return_value=_create_task_result())

resumed_job = MagicMock()
resumed_job.state = "successful"
mock_interrupt.return_value = resumed_job

sdk = MagicMock()
sdk.jobs.extract_output_async = AsyncMock(return_value='{"out": 1}')
mock_uipath.return_value = sdk

tool_fn = build_mcp_tool(_mcp_tool("optional"), mcp_client)
result = await tool_fn(invoiceId="INV-1")

mcp_client.call_tool_as_task.assert_awaited_once()
# Suspended on a WaitJobRaw carrying the job + folder keys read from _meta.
wait = mock_interrupt.call_args[0][0]
assert isinstance(wait, WaitJobRaw)
assert str(wait.job.key) == "job-key-1"
assert str(wait.process_folder_key) == "folder-key-1"
# Resume returns the child job's output.
assert result == {"out": 1}

@patch("uipath_langchain._utils.durable_interrupt.decorator.interrupt")
async def test_non_task_tool_calls_synchronously(
self, mock_interrupt: MagicMock
) -> None:
mcp_client = MagicMock()
sync_result = MagicMock()
sync_result.content = "sync-result"
mcp_client.call_tool = AsyncMock(return_value=sync_result)

tool_fn = build_mcp_tool(_mcp_tool(None), mcp_client)
result = await tool_fn(x=1)

mcp_client.call_tool.assert_awaited_once()
mock_interrupt.assert_not_called()
assert result == "sync-result"