diff --git a/samples/server/README.md b/samples/server/README.md new file mode 100644 index 0000000..2e6bb59 --- /dev/null +++ b/samples/server/README.md @@ -0,0 +1,44 @@ +# MCP Auth sample servers + +This sample server folder contains sample servers that demonstrate how to use the MCP Auth Python SDK in various scenarios. + +See [the documentation](https://mcp-auth.dev/docs) for the full guide. + +## Get started + +### WhoAmI MCP server + +A simple server that demonstrates basic authentication. It provides a single tool: + +- `whoami`: Returns the authenticated user's information + +To run the WhoAmI server: +```bash +# Make sure you are in the server directory first +cd samples/server + +# Start the WhoAmI server +uvicorn whoami:app --host 0.0.0.0 --port 3001 +``` + +### Todo manager MCP server + +A more complex example demonstrating authentication and authorization with different permission scopes. It provides the following tools: + +- `create-todo`: Create a new todo (requires `create:todos` scope) +- `get-todos`: List todos (requires `read:todos` scope for all todos) +- `delete-todo`: Delete a todo (requires `delete:todos` scope for others' todos) + +To run the Todo Manager server: +```bash +# Make sure you are in the server directory first +cd samples/server + +# Start the Todo Manager server +uvicorn todo-manager.server:app --host 0.0.0.0 --port 3001 +``` + +## Environment variables + +Make sure to set the following environment variable before running the servers: +- `MCP_AUTH_ISSUER`: The URL of your MCP Auth server diff --git a/samples/server/todo-manager/server.py b/samples/server/todo-manager/server.py new file mode 100644 index 0000000..ace270c --- /dev/null +++ b/samples/server/todo-manager/server.py @@ -0,0 +1,133 @@ +""" +An FastMCP server that provides Todo management tools with authentication and authorization. + +This server demonstrates more complex authentication scenarios with different permission scopes: +- create-todo: Create a new todo (requires 'create:todos' scope) +- get-todos: List todos (requires 'read:todos' scope for all todos, otherwise only own todos) +- delete-todo: Delete a todo (requires 'delete:todos' scope for others' todos) + +This server is compatible with OpenID Connect (OIDC) providers and uses the `mcpauth` library +to handle authorization. Please check https://mcp-auth.dev/docs/tutorials/todo-manager for more +information on how to use this server. +""" + +import os +from typing import Any, List, Optional +from mcp.server.fastmcp import FastMCP +from starlette.applications import Starlette +from starlette.routing import Mount +from starlette.middleware import Middleware + +from mcpauth import MCPAuth +from mcpauth.config import AuthServerType +from mcpauth.exceptions import ( + MCPAuthBearerAuthException, + BearerAuthExceptionCode, +) +from mcpauth.types import AuthInfo +from mcpauth.utils import fetch_server_config +from .service import TodoService + +# Initialize the FastMCP server +mcp = FastMCP("Todo Manager") + +# Initialize the todo service +todo_service = TodoService() + +# Authorization server configuration +issuer_placeholder = "https://replace-with-your-issuer-url.com" +auth_issuer = os.getenv("MCP_AUTH_ISSUER", issuer_placeholder) + +if auth_issuer == issuer_placeholder: + raise ValueError( + "MCP_AUTH_ISSUER environment variable is not set. Please set it to your authorization server's issuer URL." + ) + +auth_server_config = fetch_server_config(auth_issuer, AuthServerType.OIDC) +mcp_auth = MCPAuth(server=auth_server_config) + +def assert_user_id(auth_info: Optional[AuthInfo]) -> str: + """Assert that auth_info contains a valid user ID and return it.""" + if not auth_info or not auth_info.subject: + raise Exception("Invalid auth info") + return auth_info.subject + + +def has_required_scopes(user_scopes: List[str], required_scopes: List[str]) -> bool: + """Check if user has all required scopes.""" + return all(scope in user_scopes for scope in required_scopes) + + +@mcp.tool() +def create_todo(content: str) -> dict[str, Any]: + """Create a new todo. Requires 'create:todos' scope.""" + auth_info = mcp_auth.auth_info + user_id = assert_user_id(auth_info) + + # Only users with 'create:todos' scope can create todos + user_scopes = auth_info.scopes if auth_info else [] + if not has_required_scopes(user_scopes, ["create:todos"]): + raise MCPAuthBearerAuthException(BearerAuthExceptionCode.MISSING_REQUIRED_SCOPES) + + created_todo = todo_service.create_todo(content=content, owner_id=user_id) + return created_todo + + +@mcp.tool() +def get_todos() -> dict[str, Any]: + """ + List todos. Users with 'read:todos' scope can see all todos, + otherwise they can only see their own todos. + """ + auth_info = mcp_auth.auth_info + user_id = assert_user_id(auth_info) + + # If user has 'read:todos' scope, they can access all todos + # If user doesn't have 'read:todos' scope, they can only access their own todos + user_scopes = auth_info.scopes if auth_info else [] + todo_owner_id = None if has_required_scopes(user_scopes, ["read:todos"]) else user_id + + todos = todo_service.get_all_todos(todo_owner_id) + return {"todos": todos} + + +@mcp.tool() +def delete_todo(id: str) -> dict[str, Any]: + """ + Delete a todo by id. Users can delete their own todos. + Users with 'delete:todos' scope can delete any todo. + """ + auth_info = mcp_auth.auth_info + user_id = assert_user_id(auth_info) + + todo = todo_service.get_todo_by_id(id) + + if not todo: + return {"error": "Failed to delete todo"} + + # Users can only delete their own todos + # Users with 'delete:todos' scope can delete any todo + user_scopes = auth_info.scopes if auth_info else [] + if todo.owner_id != user_id and not has_required_scopes(user_scopes, ["delete:todos"]): + return {"error": "Failed to delete todo"} + + deleted_todo = todo_service.delete_todo(id) + + if deleted_todo: + return { + "message": f"Todo {id} deleted", + "details": deleted_todo + } + else: + return {"error": "Failed to delete todo"} + +# Create the middleware and app +bearer_auth = Middleware(mcp_auth.bearer_auth_middleware('jwt')) +app = Starlette( + routes=[ + # Add the metadata route (`/.well-known/oauth-authorization-server`) + mcp_auth.metadata_route(), + # Protect the MCP server with the Bearer auth middleware + Mount("/", app=mcp.sse_app(), middleware=[bearer_auth]), + ], +) diff --git a/samples/server/todo-manager/service.py b/samples/server/todo-manager/service.py new file mode 100644 index 0000000..34dc718 --- /dev/null +++ b/samples/server/todo-manager/service.py @@ -0,0 +1,104 @@ +""" +A simple Todo service for demonstration purposes. +Uses an in-memory list to store todos. +""" + +from datetime import datetime +from typing import List, Optional, Dict, Any +import random +import string + +class Todo: + """Represents a todo item.""" + + def __init__(self, id: str, content: str, owner_id: str, created_at: str): + self.id = id + self.content = content + self.owner_id = owner_id + self.created_at = created_at + + def to_dict(self) -> Dict[str, Any]: + """Convert todo to dictionary for JSON serialization.""" + return { + "id": self.id, + "content": self.content, + "ownerId": self.owner_id, + "createdAt": self.created_at + } + + +class TodoService: + """A simple Todo service for demonstration purposes.""" + + def __init__(self): + self._todos: List[Todo] = [] + + def get_all_todos(self, owner_id: Optional[str] = None) -> List[Dict[str, Any]]: + """ + Get all todos, optionally filtered by owner_id. + + Args: + owner_id: If provided, only return todos owned by this user + + Returns: + List of todo dictionaries + """ + if owner_id: + filtered_todos = [todo for todo in self._todos if todo.owner_id == owner_id] + return [todo.to_dict() for todo in filtered_todos] + return [todo.to_dict() for todo in self._todos] + + def get_todo_by_id(self, todo_id: str) -> Optional[Todo]: + """ + Get a todo by its ID. + + Args: + todo_id: The ID of the todo to retrieve + + Returns: + Todo object if found, None otherwise + """ + for todo in self._todos: + if todo.id == todo_id: + return todo + return None + + def create_todo(self, content: str, owner_id: str) -> Dict[str, Any]: + """ + Create a new todo. + + Args: + content: The content of the todo + owner_id: The ID of the user who owns this todo + + Returns: + Dictionary representation of the created todo + """ + todo = Todo( + id=self._generate_id(), + content=content, + owner_id=owner_id, + created_at=datetime.now().isoformat() + ) + self._todos.append(todo) + return todo.to_dict() + + def delete_todo(self, todo_id: str) -> Optional[Dict[str, Any]]: + """ + Delete a todo by its ID. + + Args: + todo_id: The ID of the todo to delete + + Returns: + Dictionary representation of the deleted todo if found, None otherwise + """ + for i, todo in enumerate(self._todos): + if todo.id == todo_id: + deleted_todo = self._todos.pop(i) + return deleted_todo.to_dict() + return None + + def _generate_id(self) -> str: + """Generate a random ID for a todo.""" + return ''.join(random.choices(string.ascii_lowercase + string.digits, k=8))