diff --git a/python/hopsworks/mcp/server.py b/python/hopsworks/mcp/server.py index f675896bd..e8109b38a 100644 --- a/python/hopsworks/mcp/server.py +++ b/python/hopsworks/mcp/server.py @@ -17,16 +17,20 @@ """MCP server for Hopsworks.""" from fastmcp import FastMCP +from starlette import status +from starlette.responses import Response from .prompts import ProjectPrompts, SystemPrompts from .resources import FeatureStoreResources, ProjectResources from .tools import ( AuthTools, + BrewerTools, DatasetTools, FeatureGroupTools, FeatureStoreTools, JobTools, ProjectTools, + UnixTools, ) @@ -44,3 +48,13 @@ _job_tools = JobTools(mcp) _dataset_tools = DatasetTools(mcp) _feature_group_tools = FeatureGroupTools(mcp) +_unix_tools = UnixTools(mcp) +_brewer_tools = BrewerTools(mcp) + + +@mcp.custom_route("/health", methods=["GET"]) +async def health(_): + return Response(status_code=status.HTTP_204_NO_CONTENT) + + +app = mcp.http_app() diff --git a/python/hopsworks/mcp/tools/__init__.py b/python/hopsworks/mcp/tools/__init__.py index 44720321f..a2cb698e9 100644 --- a/python/hopsworks/mcp/tools/__init__.py +++ b/python/hopsworks/mcp/tools/__init__.py @@ -16,8 +16,10 @@ """Tools for the Hopsworks MCP server.""" from .auth import AuthTools # noqa: F401 +from .brewer import BrewerTools # noqa: F401 from .dataset import DatasetTools # noqa: F401 from .feature_group import FeatureGroupTools # noqa: F401 from .feature_store import FeatureStoreTools # noqa: F401 from .jobs import JobTools # noqa: F401 from .project import ProjectTools # noqa: F401 +from .unix import UnixTools # noqa: F401 diff --git a/python/hopsworks/mcp/tools/brewer.py b/python/hopsworks/mcp/tools/brewer.py new file mode 100644 index 000000000..0a6d2356a --- /dev/null +++ b/python/hopsworks/mcp/tools/brewer.py @@ -0,0 +1,166 @@ +# +# Copyright 2025 Hopsworks AB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import annotations + +import asyncio +import os +from pathlib import Path +from typing import TYPE_CHECKING + +from fastmcp.server.dependencies import get_context +from fastmcp.server.http import _current_http_request +from filelock import AsyncFileLock +from hopsworks.mcp.utils.tags import TAGS +from pydantic import BaseModel + + +if TYPE_CHECKING: + from fastmcp import Context + + +class ExecutionResult(BaseModel): + output: str = "" + returncode: int | None = None + + +class BrewerTools: + def __init__(self, mcp): + """Initialize the BrewerTools with the MCP server instance. + + Parameters: + mcp: The MCP server instance + """ + self.mcp = mcp + self.mcp.tool(tags=[TAGS.BREWER])(self.execute) + + # TODO: Use on_notification Middleware to handle cancellation requests, add process manager + + async def execute( + self, + chat_id: int, + script_path: Path, + ctx: Context, + ) -> ExecutionResult: + """Execute a Python script in a conda environment for a specific chat.""" + await ctx.info("Locking the chat environment for execution...\n") + chatdir = Path(f"/hopsfs/Brewer/{chat_id}") + async with AsyncFileLock(f"{chatdir}/.lock"): + await extract_hopsworks_credentials(chatdir) + + envname = await get_chat_env(chat_id, chatdir) + + await ctx.info("Executing the script...\n") + if not script_path.is_absolute(): + script_path = (chatdir / script_path).resolve() + + envcopy = os.environ.copy() + envcopy["SECRETS_DIR"] = str(chatdir) + + proc = await asyncio.create_subprocess_exec( + "conda", + "run", + "-n", + envname, + "python", + script_path.name, + cwd=script_path.parent, + env=envcopy, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + ) + if proc.stdout is None: + raise Exception("Unable to create env: stdout is None") + output = "" + while proc.returncode is None: + buf = await proc.stdout.readline() + if not buf: + break + b = buf.decode() + output += b + await ctx.info(b) + + return ExecutionResult( + output=output, + returncode=proc.returncode, + ) + + +async def extract_hopsworks_credentials(chatdir: Path): + ctx = get_context() + await ctx.info("Setting up authentication credentials...\n") + request = _current_http_request.get() + if not request: + raise Exception("No HTTP request found") + auth = next( + ( + request.headers.get(key) + for key in request.headers + if key.lower() == "authorization" + ), + None, + ) + if not auth: + raise Exception("No authentication header found") + if auth.startswith("Bearer"): + with open(chatdir / "token.jwt", "w") as f: + f.write(auth.removeprefix("Bearer").strip()) + elif auth.startswith("ApiKey"): + with open(chatdir / "api.key", "w") as f: + f.write(auth.removeprefix("ApiKey").strip()) + else: + raise Exception("Unknown auth type") + + +async def get_chat_env(chat_id: int, chatdir: Path) -> str: + ctx = get_context() + await ctx.info("Checking if the chat environment is ready to be used...\n") + proc = await asyncio.create_subprocess_exec( + "conda", + "env", + "list", + cwd=chatdir, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.DEVNULL, + ) + envlist, _ = await proc.communicate() + envname = f"chat{chat_id}" + if envname not in envlist.decode(): + await ctx.info("Creating a new chat environment...\n") + proc = await asyncio.create_subprocess_exec( + "conda", + "create", + "--clone", + "hopsworks_environment", + "-n", + envname, + cwd=chatdir, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + ) + if proc.stdout is None: + raise Exception("Unable to create env: stdout is None") + output = "" + while proc.returncode is None: + buf = await proc.stdout.readline() + if not buf: + break + b = buf.decode() + output += b + await ctx.info(b) + if proc.returncode: + raise Exception(f"Unable to create env ({proc.returncode}):\n{output}") + return envname diff --git a/python/hopsworks/mcp/tools/unix.py b/python/hopsworks/mcp/tools/unix.py new file mode 100644 index 000000000..d2a9ae665 --- /dev/null +++ b/python/hopsworks/mcp/tools/unix.py @@ -0,0 +1,91 @@ +# +# Copyright 2025 Hopsworks AB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import subprocess +from queue import Empty, Queue +from threading import Thread + +from hopsworks.mcp.utils.tags import TAGS + + +def enqueue_output(out, queue): + # TODO: obtain output byte-by-byte + for line in iter(out.readline, b""): + queue.put(line) + out.close() + + +class UnixTools: + """Tools for managing jobs in Hopsworks and executing Unix/Git commands.""" + + def __init__(self, mcp): + """Initialize the UnixTools with the MCP server instance. + + Parameters: + mcp: The MCP server instance + """ + self.mcp = mcp + self.mcp.tool(tags=[TAGS.UNIX])(self.start_session) + self.mcp.tool(tags=[TAGS.UNIX])(self.add_input) + self.mcp.tool(tags=[TAGS.UNIX])(self.get_output) + + self.sessions = {} + + def start_session(self, cwd): + """Start a Unix session in the specified directory. + + Parameters: + cwd: The directory path to start the session in. + + Returns: + UnixTools: An instance of UnixTools for the specified directory. + """ + # TODO: delete processes which were used longer than 5 min ago + + proc = subprocess.Popen( + ["bash"], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + cwd=cwd, + text=True, + bufsize=1, + ) + + output_queue = Queue() + t = Thread(target=enqueue_output, args=(proc.stdout, output_queue)) + t.daemon = True # thread dies with the program + t.start() + + self.sessions[proc.pid] = (proc, output_queue, "") + # TODO: check for errors + + print(proc.pid) + return proc.pid + + def add_input(self, pid: int, addon: str): + proc, _, _ = self.sessions[pid] + proc.stdin.write(addon) + proc.stdin.flush() + + def get_output(self, pid: int, offset: int = 0): + proc, output_queue, output = self.sessions[pid] + while True: + try: + output += output_queue.get_nowait() + except Empty: + self.sessions[pid] = (proc, output_queue, output) + return output[offset:] diff --git a/python/hopsworks/mcp/utils/tags.py b/python/hopsworks/mcp/utils/tags.py index 851c5481d..9771356a3 100644 --- a/python/hopsworks/mcp/utils/tags.py +++ b/python/hopsworks/mcp/utils/tags.py @@ -10,6 +10,8 @@ class TAGS: ENVIRONMENT = "environment" JOB = "job" SYSTEM = "system" + UNIX = "unix" + BREWER = "brewer" STATEFUL = "stateful" STATELESS = "stateless" diff --git a/python/test.py b/python/test.py new file mode 100644 index 000000000..d50f984ed --- /dev/null +++ b/python/test.py @@ -0,0 +1,19 @@ +import asyncio +import fastmcp +import time + +async def main(): + async with fastmcp.Client("http://localhost:8000/mcp") as client: + res = await client.call_tool("start_session", {"cwd": "/"}) + pid = int(res.content[0].text) + await client.call_tool("add_input", {"pid": pid, "addon": "for i in {1..120}; do echo tick $i; sleep 1; done\n"}) + + offset = 0 + while True: + res = await client.call_tool("get_output", {"pid": pid, "offset": offset}) + addon = res.content[0].text + print(addon, end="", flush=True) + offset += len(addon) + time.sleep(0.1) + +asyncio.run(main())