diff --git a/openviking/server/app.py b/openviking/server/app.py index 32b0fb597..7779af704 100644 --- a/openviking/server/app.py +++ b/openviking/server/app.py @@ -44,6 +44,14 @@ logger = get_logger(__name__) +def _apply_server_service_config(config: ServerConfig, service: OpenVikingService) -> None: + """Push server-only runtime config into service components.""" + service.resources.set_token_guardrails( + add_resource=config.token_guardrails.add_resource, + add_skill=config.token_guardrails.add_skill, + ) + + def create_app( config: Optional[ServerConfig] = None, service: Optional[OpenVikingService] = None, @@ -73,6 +81,7 @@ async def lifespan(app: FastAPI): assert service is not None set_service(service) + _apply_server_service_config(config, service) # Initialize APIKeyManager after service (needs VikingFS) effective_auth_mode = config.get_effective_auth_mode() @@ -153,6 +162,8 @@ async def lifespan(app: FastAPI): ) app.state.config = config + if service is not None: + _apply_server_service_config(config, service) # Add CORS middleware app.add_middleware( diff --git a/openviking/server/config.py b/openviking/server/config.py index 6a6b96f7d..50567cd1a 100644 --- a/openviking/server/config.py +++ b/openviking/server/config.py @@ -54,6 +54,15 @@ class ObservabilityConfig(BaseModel): model_config = {"extra": "forbid"} +class TokenGuardrailsConfig(BaseModel): + """Optional per-operation token limits for expensive ingest paths.""" + + add_resource: Optional[int] = Field(default=None, ge=1) + add_skill: Optional[int] = Field(default=None, ge=1) + + model_config = {"extra": "forbid"} + + class ServerConfig(BaseModel): host: str = "127.0.0.1" port: int = 1933 @@ -65,6 +74,7 @@ class ServerConfig(BaseModel): bot_api_url: str = "http://localhost:18790" # Vikingbot OpenAPIChannel URL (default port) encryption_enabled: bool = False # Whether API key hashing is enabled observability: ObservabilityConfig = Field(default_factory=ObservabilityConfig) + token_guardrails: TokenGuardrailsConfig = Field(default_factory=TokenGuardrailsConfig) model_config = {"extra": "forbid"} diff --git a/openviking/service/resource_service.py b/openviking/service/resource_service.py index f673effb9..f318af090 100644 --- a/openviking/service/resource_service.py +++ b/openviking/service/resource_service.py @@ -8,6 +8,7 @@ import json import time +from pathlib import Path from typing import TYPE_CHECKING, Any, Dict, List, Optional from openviking.server.identity import RequestContext @@ -34,6 +35,7 @@ DeadlineExceededError, InvalidArgumentError, NotInitializedError, + ResourceExhaustedError, ) from openviking_cli.utils import get_logger from openviking_cli.utils.uri import VikingURI @@ -45,6 +47,13 @@ logger = get_logger(__name__) +def _estimate_tokens_from_text(text: str) -> int: + normalized = text.strip() + if not normalized: + return 0 + return (len(normalized) + 3) // 4 + + class ResourceService: """Resource management service.""" @@ -61,6 +70,10 @@ def __init__( self._resource_processor = resource_processor self._skill_processor = skill_processor self._watch_scheduler = watch_scheduler + self._token_guardrails: Dict[str, Optional[int]] = { + "add_resource": None, + "add_skill": None, + } def set_dependencies( self, @@ -77,6 +90,16 @@ def set_dependencies( self._skill_processor = skill_processor self._watch_scheduler = watch_scheduler + def set_token_guardrails( + self, + *, + add_resource: Optional[int] = None, + add_skill: Optional[int] = None, + ) -> None: + """Configure per-operation token guardrails for ingest-heavy paths.""" + self._token_guardrails["add_resource"] = add_resource + self._token_guardrails["add_skill"] = add_skill + def _get_watch_manager(self) -> Optional["WatchManager"]: if not self._watch_scheduler: return None @@ -101,6 +124,78 @@ def _ensure_initialized(self) -> None: if not self._viking_fs: raise NotInitializedError("VikingFS") + def _estimate_path_payload_tokens(self, path: str) -> int: + candidate = Path(path) + try: + exists = candidate.exists() + except (OSError, ValueError): + exists = False + if exists: + if candidate.is_file(): + return (candidate.stat().st_size + 3) // 4 + if candidate.is_dir(): + total_bytes = 0 + for item in candidate.rglob("*"): + if item.is_file(): + try: + total_bytes += item.stat().st_size + except OSError: + continue + return (total_bytes + 3) // 4 + return _estimate_tokens_from_text(path) + + def _estimate_skill_payload_tokens(self, data: Any) -> int: + if data is None: + return 0 + if isinstance(data, Path): + return self._estimate_path_payload_tokens(str(data)) + if isinstance(data, str): + candidate = Path(data) + try: + exists = candidate.exists() + except (OSError, ValueError): + exists = False + if exists: + return self._estimate_path_payload_tokens(data) + return _estimate_tokens_from_text(data) + if isinstance(data, dict): + return _estimate_tokens_from_text(json.dumps(data, ensure_ascii=False)) + return _estimate_tokens_from_text(str(data)) + + def _enforce_token_guardrail( + self, + *, + operation: str, + estimated_tokens: int, + extra_tokens: int = 0, + ) -> None: + limit = self._token_guardrails.get(operation) + if limit is None: + return + + total_estimated = estimated_tokens + extra_tokens + telemetry = get_current_telemetry() + prefix = "resource" if operation == "add_resource" else "skill" + telemetry.set(f"{prefix}.guardrail.limit_tokens", limit) + telemetry.set(f"{prefix}.guardrail.estimated_tokens", total_estimated) + + if total_estimated <= limit: + telemetry.set(f"{prefix}.guardrail.blocked", False) + return + + telemetry.set(f"{prefix}.guardrail.blocked", True) + raise ResourceExhaustedError( + ( + f"{operation} estimated input tokens ({total_estimated}) exceed configured " + f"limit ({limit})" + ), + details={ + "operation": operation, + "estimated_tokens": total_estimated, + "limit_tokens": limit, + }, + ) + async def add_resource( self, path: str, @@ -173,6 +268,13 @@ async def add_resource( telemetry.set("resource.flags.watch_enabled", watch_enabled) try: + self._enforce_token_guardrail( + operation="add_resource", + estimated_tokens=self._estimate_path_payload_tokens(path), + extra_tokens=_estimate_tokens_from_text(reason) + + _estimate_tokens_from_text(instruction), + ) + # add_resource only supports resources scope if to and to.startswith("viking://"): parsed = VikingURI(to) @@ -434,6 +536,11 @@ async def add_skill( request_wait_tracker.register_request(telemetry_id) try: + self._enforce_token_guardrail( + operation="add_skill", + estimated_tokens=self._estimate_skill_payload_tokens(data), + ) + result = await self._skill_processor.process_skill( data=data, viking_fs=self._viking_fs, diff --git a/openviking_cli/client/http.py b/openviking_cli/client/http.py index 86e23468b..02023e00e 100644 --- a/openviking_cli/client/http.py +++ b/openviking_cli/client/http.py @@ -28,6 +28,7 @@ OpenVikingError, PermissionDeniedError, ProcessingError, + ResourceExhaustedError, SessionExpiredError, UnauthenticatedError, UnavailableError, @@ -48,6 +49,7 @@ "FAILED_PRECONDITION": FailedPreconditionError, "UNAUTHENTICATED": UnauthenticatedError, "PERMISSION_DENIED": PermissionDeniedError, + "RESOURCE_EXHAUSTED": ResourceExhaustedError, "UNAVAILABLE": UnavailableError, "INTERNAL": InternalError, "DEADLINE_EXCEEDED": DeadlineExceededError, diff --git a/openviking_cli/exceptions.py b/openviking_cli/exceptions.py index 476faf41d..9360950f4 100644 --- a/openviking_cli/exceptions.py +++ b/openviking_cli/exceptions.py @@ -103,6 +103,13 @@ def __init__(self, message: str = "Permission denied", resource: Optional[str] = super().__init__(message, code="PERMISSION_DENIED", details=details) +class ResourceExhaustedError(OpenVikingError): + """A configured quota or resource limit was exceeded.""" + + def __init__(self, message: str = "Resource exhausted", details: Optional[dict] = None): + super().__init__(message, code="RESOURCE_EXHAUSTED", details=details) + + # ============= Service Errors ============= diff --git a/tests/metrics/config/test_server_config.py b/tests/metrics/config/test_server_config.py index 47f1cc81b..286c6192c 100644 --- a/tests/metrics/config/test_server_config.py +++ b/tests/metrics/config/test_server_config.py @@ -57,3 +57,24 @@ def test_load_server_config_preserves_metrics_fields_under_server_observability( "openviking_http_requests_total", "openviking_task_pending", ] + + +def test_load_server_config_preserves_token_guardrails(tmp_path): + config_path = tmp_path / "ov.conf" + config_path.write_text( + json.dumps( + { + "server": { + "token_guardrails": { + "add_resource": 2048, + "add_skill": 1024, + } + } + } + ) + ) + + config = load_server_config(str(config_path)) + + assert config.token_guardrails.add_resource == 2048 + assert config.token_guardrails.add_skill == 1024 diff --git a/tests/server/test_error_scenarios.py b/tests/server/test_error_scenarios.py index 764a0631d..dfaf100f7 100644 --- a/tests/server/test_error_scenarios.py +++ b/tests/server/test_error_scenarios.py @@ -55,6 +55,27 @@ async def test_add_resource_file_not_found(client: httpx.AsyncClient): assert body["error"]["code"] == "PERMISSION_DENIED" +async def test_add_resource_token_guardrail_returns_structured_429( + client: httpx.AsyncClient, + service, +): + service.resources.set_token_guardrails(add_resource=8) + + resp = await client.post( + "/api/v1/resources", + json={ + "path": "https://example.com/" + ("long-segment-" * 40), + "reason": "guardrail test", + }, + ) + + body = resp.json() + assert resp.status_code == 429 + assert body["status"] == "error" + assert body["error"]["code"] == "RESOURCE_EXHAUSTED" + assert body["error"]["details"]["operation"] == "add_resource" + + async def test_empty_body_on_post(client: httpx.AsyncClient): """POST with empty body should return 422.""" resp = await client.post( diff --git a/tests/service/test_token_guardrails.py b/tests/service/test_token_guardrails.py new file mode 100644 index 000000000..01670c6c9 --- /dev/null +++ b/tests/service/test_token_guardrails.py @@ -0,0 +1,100 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: AGPL-3.0 + +import asyncio +import sys +from types import SimpleNamespace +from unittest.mock import AsyncMock + +import pytest + +sys.modules.setdefault("volcenginesdkarkruntime", SimpleNamespace()) + +from openviking.server.identity import RequestContext, Role +from openviking.service.resource_service import ResourceService +from openviking_cli.exceptions import ResourceExhaustedError +from openviking_cli.session.user_id import UserIdentifier + + +@pytest.fixture +def request_context() -> RequestContext: + return RequestContext( + user=UserIdentifier("test_account", "test_user", "test_agent"), + role=Role.USER, + ) + + +@pytest.fixture +def resource_service() -> ResourceService: + return ResourceService( + vikingdb=SimpleNamespace(), + viking_fs=SimpleNamespace(), + resource_processor=SimpleNamespace( + process_resource=AsyncMock(return_value={"root_uri": "viking://resources/demo"}) + ), + skill_processor=SimpleNamespace( + process_skill=AsyncMock( + return_value={"status": "success", "uri": "viking://agent/skills/demo"} + ) + ), + ) + + +def test_add_resource_blocks_when_estimated_tokens_exceed_limit( + resource_service: ResourceService, + request_context: RequestContext, + tmp_path, +): + path = tmp_path / "large.md" + path.write_text("token-guard " * 200, encoding="utf-8") + resource_service.set_token_guardrails(add_resource=50) + + with pytest.raises(ResourceExhaustedError, match="add_resource estimated input tokens"): + asyncio.run( + resource_service.add_resource( + path=str(path), + ctx=request_context, + reason="large import", + ) + ) + + resource_service._resource_processor.process_resource.assert_not_awaited() + + +def test_add_skill_blocks_when_estimated_tokens_exceed_limit( + resource_service: ResourceService, + request_context: RequestContext, +): + resource_service.set_token_guardrails(add_skill=40) + skill_markdown = "# Demo Skill\n\n" + ("very detailed guidance " * 80) + + with pytest.raises(ResourceExhaustedError, match="add_skill estimated input tokens"): + asyncio.run( + resource_service.add_skill( + data=skill_markdown, + ctx=request_context, + ) + ) + + resource_service._skill_processor.process_skill.assert_not_awaited() + + +def test_add_resource_allows_requests_within_limit( + resource_service: ResourceService, + request_context: RequestContext, + tmp_path, +): + path = tmp_path / "small.md" + path.write_text("small file", encoding="utf-8") + resource_service.set_token_guardrails(add_resource=100) + + result = asyncio.run( + resource_service.add_resource( + path=str(path), + ctx=request_context, + reason="ok", + ) + ) + + assert result["root_uri"] == "viking://resources/demo" + resource_service._resource_processor.process_resource.assert_awaited_once()