Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions python/hopsworks/mcp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,20 @@
"""MCP server for Hopsworks."""

from fastmcp import FastMCP
from starlette import status
from starlette.responses import Response

from .prompts import ProjectPrompts, SystemPrompts
from .resources import FeatureStoreResources, ProjectResources
from .tools import (
AuthTools,
BrewerTools,
DatasetTools,
FeatureGroupTools,
FeatureStoreTools,
JobTools,
ProjectTools,
UnixTools,
)


Expand All @@ -44,3 +48,13 @@
_job_tools = JobTools(mcp)
_dataset_tools = DatasetTools(mcp)
_feature_group_tools = FeatureGroupTools(mcp)
_unix_tools = UnixTools(mcp)
_brewer_tools = BrewerTools(mcp)


@mcp.custom_route("/health", methods=["GET"])
async def health(_):
return Response(status_code=status.HTTP_204_NO_CONTENT)


app = mcp.http_app()
2 changes: 2 additions & 0 deletions python/hopsworks/mcp/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
"""Tools for the Hopsworks MCP server."""

from .auth import AuthTools # noqa: F401
from .brewer import BrewerTools # noqa: F401
from .dataset import DatasetTools # noqa: F401
from .feature_group import FeatureGroupTools # noqa: F401
from .feature_store import FeatureStoreTools # noqa: F401
from .jobs import JobTools # noqa: F401
from .project import ProjectTools # noqa: F401
from .unix import UnixTools # noqa: F401
166 changes: 166 additions & 0 deletions python/hopsworks/mcp/tools/brewer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
#
# Copyright 2025 Hopsworks AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from __future__ import annotations

import asyncio
import os
from pathlib import Path
from typing import TYPE_CHECKING

from fastmcp.server.dependencies import get_context
from fastmcp.server.http import _current_http_request
from filelock import AsyncFileLock
from hopsworks.mcp.utils.tags import TAGS
from pydantic import BaseModel


if TYPE_CHECKING:
from fastmcp import Context


class ExecutionResult(BaseModel):
output: str = ""
returncode: int | None = None


class BrewerTools:
def __init__(self, mcp):
"""Initialize the BrewerTools with the MCP server instance.

Parameters:
mcp: The MCP server instance
"""
self.mcp = mcp
self.mcp.tool(tags=[TAGS.BREWER])(self.execute)

# TODO: Use on_notification Middleware to handle cancellation requests, add process manager

async def execute(
self,
chat_id: int,
script_path: Path,
ctx: Context,
) -> ExecutionResult:
"""Execute a Python script in a conda environment for a specific chat."""
await ctx.info("Locking the chat environment for execution...\n")
chatdir = Path(f"/hopsfs/Brewer/{chat_id}")
async with AsyncFileLock(f"{chatdir}/.lock"):
await extract_hopsworks_credentials(chatdir)

envname = await get_chat_env(chat_id, chatdir)

await ctx.info("Executing the script...\n")
if not script_path.is_absolute():
script_path = (chatdir / script_path).resolve()

envcopy = os.environ.copy()
envcopy["SECRETS_DIR"] = str(chatdir)

proc = await asyncio.create_subprocess_exec(
"conda",
"run",
"-n",
envname,
"python",
script_path.name,
cwd=script_path.parent,
env=envcopy,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
if proc.stdout is None:
raise Exception("Unable to create env: stdout is None")
output = ""
while proc.returncode is None:
buf = await proc.stdout.readline()
if not buf:
break
b = buf.decode()
output += b
await ctx.info(b)

return ExecutionResult(
output=output,
returncode=proc.returncode,
)


async def extract_hopsworks_credentials(chatdir: Path):
ctx = get_context()
await ctx.info("Setting up authentication credentials...\n")
request = _current_http_request.get()
if not request:
raise Exception("No HTTP request found")
auth = next(
(
request.headers.get(key)
for key in request.headers
if key.lower() == "authorization"
),
None,
)
if not auth:
raise Exception("No authentication header found")
if auth.startswith("Bearer"):
with open(chatdir / "token.jwt", "w") as f:
f.write(auth.removeprefix("Bearer").strip())
elif auth.startswith("ApiKey"):
with open(chatdir / "api.key", "w") as f:
f.write(auth.removeprefix("ApiKey").strip())
else:
raise Exception("Unknown auth type")


async def get_chat_env(chat_id: int, chatdir: Path) -> str:
ctx = get_context()
await ctx.info("Checking if the chat environment is ready to be used...\n")
proc = await asyncio.create_subprocess_exec(
"conda",
"env",
"list",
cwd=chatdir,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL,
)
envlist, _ = await proc.communicate()
envname = f"chat{chat_id}"
if envname not in envlist.decode():
await ctx.info("Creating a new chat environment...\n")
proc = await asyncio.create_subprocess_exec(
"conda",
"create",
"--clone",
"hopsworks_environment",
"-n",
envname,
cwd=chatdir,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
if proc.stdout is None:
raise Exception("Unable to create env: stdout is None")
output = ""
while proc.returncode is None:
buf = await proc.stdout.readline()
if not buf:
break
b = buf.decode()
output += b
await ctx.info(b)
if proc.returncode:
raise Exception(f"Unable to create env ({proc.returncode}):\n{output}")
return envname
91 changes: 91 additions & 0 deletions python/hopsworks/mcp/tools/unix.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#
# Copyright 2025 Hopsworks AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import subprocess
from queue import Empty, Queue
from threading import Thread

from hopsworks.mcp.utils.tags import TAGS


def enqueue_output(out, queue):
# TODO: obtain output byte-by-byte
for line in iter(out.readline, b""):
queue.put(line)
out.close()


class UnixTools:
"""Tools for managing jobs in Hopsworks and executing Unix/Git commands."""

def __init__(self, mcp):
"""Initialize the UnixTools with the MCP server instance.

Parameters:
mcp: The MCP server instance
"""
self.mcp = mcp
self.mcp.tool(tags=[TAGS.UNIX])(self.start_session)
self.mcp.tool(tags=[TAGS.UNIX])(self.add_input)
self.mcp.tool(tags=[TAGS.UNIX])(self.get_output)

self.sessions = {}

def start_session(self, cwd):
"""Start a Unix session in the specified directory.

Parameters:
cwd: The directory path to start the session in.

Returns:
UnixTools: An instance of UnixTools for the specified directory.
"""
# TODO: delete processes which were used longer than 5 min ago

proc = subprocess.Popen(
["bash"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
cwd=cwd,
text=True,
bufsize=1,
)

output_queue = Queue()
t = Thread(target=enqueue_output, args=(proc.stdout, output_queue))
t.daemon = True # thread dies with the program
t.start()

self.sessions[proc.pid] = (proc, output_queue, "")
# TODO: check for errors

print(proc.pid)
return proc.pid

def add_input(self, pid: int, addon: str):
proc, _, _ = self.sessions[pid]
proc.stdin.write(addon)
proc.stdin.flush()

def get_output(self, pid: int, offset: int = 0):
proc, output_queue, output = self.sessions[pid]
while True:
try:
output += output_queue.get_nowait()
except Empty:
self.sessions[pid] = (proc, output_queue, output)
return output[offset:]
2 changes: 2 additions & 0 deletions python/hopsworks/mcp/utils/tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ class TAGS:
ENVIRONMENT = "environment"
JOB = "job"
SYSTEM = "system"
UNIX = "unix"
BREWER = "brewer"

STATEFUL = "stateful"
STATELESS = "stateless"
Expand Down
19 changes: 19 additions & 0 deletions python/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import asyncio
import fastmcp
import time

Check failure on line 3 in python/test.py

View workflow job for this annotation

GitHub Actions / Lint and Stylecheck

Ruff (I001)

python/test.py:1:1: I001 Import block is un-sorted or un-formatted

async def main():
async with fastmcp.Client("http://localhost:8000/mcp") as client:
res = await client.call_tool("start_session", {"cwd": "/"})
pid = int(res.content[0].text)
await client.call_tool("add_input", {"pid": pid, "addon": "for i in {1..120}; do echo tick $i; sleep 1; done\n"})

offset = 0
while True:
res = await client.call_tool("get_output", {"pid": pid, "offset": offset})
addon = res.content[0].text
print(addon, end="", flush=True)
offset += len(addon)
time.sleep(0.1)

asyncio.run(main())
Loading