Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 61 additions & 3 deletions ufo/client/mcp/local_servers/cli_mcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,48 @@
re.compile(r">{1,2}\s*[/\\]", re.IGNORECASE), # output redirection to paths
]

# start.exe flags that consume no additional argument
_START_FLAGS_NO_ARG: FrozenSet[str] = frozenset(
{
"/b", "/wait", "/min", "/max",
"/abovenormal", "/normal", "/belownormal",
"/low", "/idle", "/high", "/realtime",
"/affinity", "/node",
}
)

# start.exe flags that consume one additional argument
_START_FLAGS_ONE_ARG: FrozenSet[str] = frozenset({"/d"})


def _strip_start_prefix(tokens: List[str]) -> List[str]:
"""
If the token list begins with the Windows ``start`` shell command, strip
it along with any ``start`` flags so the returned list begins with the
actual executable. Returns the original list unchanged for all other
commands.
"""
if not tokens or tokens[0].lower() not in ("start", "start.exe"):
return tokens

idx = 1

# Skip optional window title: a non-flag token that looks like a plain
# label (no dot, no path separators) rather than an executable name.
if idx < len(tokens) and not tokens[idx].startswith("/"):
candidate = tokens[idx]
if "." not in candidate and "\\" not in candidate and "/" not in candidate:
idx += 1

# Skip /flag and /flag <value> pairs
while idx < len(tokens) and tokens[idx].startswith("/"):
flag = tokens[idx].lower()
idx += 1
if flag in _START_FLAGS_ONE_ARG and idx < len(tokens):
idx += 1 # consume the flag's argument

return tokens[idx:]


def _is_cli_command_allowed(command_str: str) -> bool:
"""
Expand All @@ -104,6 +146,12 @@ def _is_cli_command_allowed(command_str: str) -> bool:
except ValueError:
return False

if not tokens:
return False

# Unwrap Windows ``start`` launcher so we validate the real executable.
tokens = _strip_start_prefix(tokens)

if not tokens:
return False

Expand All @@ -127,6 +175,16 @@ def _is_cli_command_allowed(command_str: str) -> bool:
return True


def _resolve_launch_args(command_str: str) -> List[str]:
"""
Return the argument list to pass to ``subprocess.Popen``.
When the command begins with ``start``, the launcher prefix is stripped
so the application is launched directly without requiring ``shell=True``.
"""
tokens = shlex.split(command_str)
return _strip_start_prefix(tokens)


@MCPRegistry.register_factory_decorator("CommandLineExecutor")
def create_cli_mcp_server(*args, **kwargs) -> FastMCP:
"""
Expand Down Expand Up @@ -157,9 +215,9 @@ def run_shell(
)

try:
# Parse into argument list and launch without shell=True
# to prevent shell injection.
args = shlex.split(bash_command)
# Resolve to a direct executable invocation (strips ``start`` if
# present) and launch without shell=True to prevent shell injection.
args = _resolve_launch_args(bash_command)
subprocess.Popen(args, shell=False)
time.sleep(5) # Wait for the application to launch
except Exception as e:
Expand Down