Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions openviking/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@
logger = get_logger(__name__)


def _apply_server_service_config(config: ServerConfig, service: OpenVikingService) -> None:
"""Push server-only runtime config into service components."""
service.resources.set_token_guardrails(
add_resource=config.token_guardrails.add_resource,
add_skill=config.token_guardrails.add_skill,
)


def create_app(
config: Optional[ServerConfig] = None,
service: Optional[OpenVikingService] = None,
Expand Down Expand Up @@ -73,6 +81,7 @@ async def lifespan(app: FastAPI):

assert service is not None
set_service(service)
_apply_server_service_config(config, service)

# Initialize APIKeyManager after service (needs VikingFS)
effective_auth_mode = config.get_effective_auth_mode()
Expand Down Expand Up @@ -153,6 +162,8 @@ async def lifespan(app: FastAPI):
)

app.state.config = config
if service is not None:
_apply_server_service_config(config, service)

# Add CORS middleware
app.add_middleware(
Expand Down
10 changes: 10 additions & 0 deletions openviking/server/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ class ObservabilityConfig(BaseModel):
model_config = {"extra": "forbid"}


class TokenGuardrailsConfig(BaseModel):
"""Optional per-operation token limits for expensive ingest paths."""

add_resource: Optional[int] = Field(default=None, ge=1)
add_skill: Optional[int] = Field(default=None, ge=1)

model_config = {"extra": "forbid"}


class ServerConfig(BaseModel):
host: str = "127.0.0.1"
port: int = 1933
Expand All @@ -65,6 +74,7 @@ class ServerConfig(BaseModel):
bot_api_url: str = "http://localhost:18790" # Vikingbot OpenAPIChannel URL (default port)
encryption_enabled: bool = False # Whether API key hashing is enabled
observability: ObservabilityConfig = Field(default_factory=ObservabilityConfig)
token_guardrails: TokenGuardrailsConfig = Field(default_factory=TokenGuardrailsConfig)

model_config = {"extra": "forbid"}

Expand Down
107 changes: 107 additions & 0 deletions openviking/service/resource_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import json
import time
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional

from openviking.server.identity import RequestContext
Expand All @@ -34,6 +35,7 @@
DeadlineExceededError,
InvalidArgumentError,
NotInitializedError,
ResourceExhaustedError,
)
from openviking_cli.utils import get_logger
from openviking_cli.utils.uri import VikingURI
Expand All @@ -45,6 +47,13 @@
logger = get_logger(__name__)


def _estimate_tokens_from_text(text: str) -> int:
normalized = text.strip()
if not normalized:
return 0
return (len(normalized) + 3) // 4


class ResourceService:
"""Resource management service."""

Expand All @@ -61,6 +70,10 @@ def __init__(
self._resource_processor = resource_processor
self._skill_processor = skill_processor
self._watch_scheduler = watch_scheduler
self._token_guardrails: Dict[str, Optional[int]] = {
"add_resource": None,
"add_skill": None,
}

def set_dependencies(
self,
Expand All @@ -77,6 +90,16 @@ def set_dependencies(
self._skill_processor = skill_processor
self._watch_scheduler = watch_scheduler

def set_token_guardrails(
self,
*,
add_resource: Optional[int] = None,
add_skill: Optional[int] = None,
) -> None:
"""Configure per-operation token guardrails for ingest-heavy paths."""
self._token_guardrails["add_resource"] = add_resource
self._token_guardrails["add_skill"] = add_skill

def _get_watch_manager(self) -> Optional["WatchManager"]:
if not self._watch_scheduler:
return None
Expand All @@ -101,6 +124,78 @@ def _ensure_initialized(self) -> None:
if not self._viking_fs:
raise NotInitializedError("VikingFS")

def _estimate_path_payload_tokens(self, path: str) -> int:
candidate = Path(path)
try:
exists = candidate.exists()
except (OSError, ValueError):
exists = False
if exists:
if candidate.is_file():
return (candidate.stat().st_size + 3) // 4
if candidate.is_dir():
total_bytes = 0
for item in candidate.rglob("*"):
if item.is_file():
try:
total_bytes += item.stat().st_size
except OSError:
continue
return (total_bytes + 3) // 4
return _estimate_tokens_from_text(path)

def _estimate_skill_payload_tokens(self, data: Any) -> int:
if data is None:
return 0
if isinstance(data, Path):
return self._estimate_path_payload_tokens(str(data))
if isinstance(data, str):
candidate = Path(data)
try:
exists = candidate.exists()
except (OSError, ValueError):
exists = False
if exists:
return self._estimate_path_payload_tokens(data)
return _estimate_tokens_from_text(data)
if isinstance(data, dict):
return _estimate_tokens_from_text(json.dumps(data, ensure_ascii=False))
return _estimate_tokens_from_text(str(data))

def _enforce_token_guardrail(
self,
*,
operation: str,
estimated_tokens: int,
extra_tokens: int = 0,
) -> None:
limit = self._token_guardrails.get(operation)
if limit is None:
return

total_estimated = estimated_tokens + extra_tokens
telemetry = get_current_telemetry()
prefix = "resource" if operation == "add_resource" else "skill"
telemetry.set(f"{prefix}.guardrail.limit_tokens", limit)
telemetry.set(f"{prefix}.guardrail.estimated_tokens", total_estimated)

if total_estimated <= limit:
telemetry.set(f"{prefix}.guardrail.blocked", False)
return

telemetry.set(f"{prefix}.guardrail.blocked", True)
raise ResourceExhaustedError(
(
f"{operation} estimated input tokens ({total_estimated}) exceed configured "
f"limit ({limit})"
),
details={
"operation": operation,
"estimated_tokens": total_estimated,
"limit_tokens": limit,
},
)

async def add_resource(
self,
path: str,
Expand Down Expand Up @@ -173,6 +268,13 @@ async def add_resource(
telemetry.set("resource.flags.watch_enabled", watch_enabled)

try:
self._enforce_token_guardrail(
operation="add_resource",
estimated_tokens=self._estimate_path_payload_tokens(path),
extra_tokens=_estimate_tokens_from_text(reason)
+ _estimate_tokens_from_text(instruction),
)

# add_resource only supports resources scope
if to and to.startswith("viking://"):
parsed = VikingURI(to)
Expand Down Expand Up @@ -434,6 +536,11 @@ async def add_skill(
request_wait_tracker.register_request(telemetry_id)

try:
self._enforce_token_guardrail(
operation="add_skill",
estimated_tokens=self._estimate_skill_payload_tokens(data),
)

result = await self._skill_processor.process_skill(
data=data,
viking_fs=self._viking_fs,
Expand Down
2 changes: 2 additions & 0 deletions openviking_cli/client/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
OpenVikingError,
PermissionDeniedError,
ProcessingError,
ResourceExhaustedError,
SessionExpiredError,
UnauthenticatedError,
UnavailableError,
Expand All @@ -48,6 +49,7 @@
"FAILED_PRECONDITION": FailedPreconditionError,
"UNAUTHENTICATED": UnauthenticatedError,
"PERMISSION_DENIED": PermissionDeniedError,
"RESOURCE_EXHAUSTED": ResourceExhaustedError,
"UNAVAILABLE": UnavailableError,
"INTERNAL": InternalError,
"DEADLINE_EXCEEDED": DeadlineExceededError,
Expand Down
7 changes: 7 additions & 0 deletions openviking_cli/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@ def __init__(self, message: str = "Permission denied", resource: Optional[str] =
super().__init__(message, code="PERMISSION_DENIED", details=details)


class ResourceExhaustedError(OpenVikingError):
"""A configured quota or resource limit was exceeded."""

def __init__(self, message: str = "Resource exhausted", details: Optional[dict] = None):
super().__init__(message, code="RESOURCE_EXHAUSTED", details=details)


# ============= Service Errors =============


Expand Down
21 changes: 21 additions & 0 deletions tests/metrics/config/test_server_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,24 @@ def test_load_server_config_preserves_metrics_fields_under_server_observability(
"openviking_http_requests_total",
"openviking_task_pending",
]


def test_load_server_config_preserves_token_guardrails(tmp_path):
config_path = tmp_path / "ov.conf"
config_path.write_text(
json.dumps(
{
"server": {
"token_guardrails": {
"add_resource": 2048,
"add_skill": 1024,
}
}
}
)
)

config = load_server_config(str(config_path))

assert config.token_guardrails.add_resource == 2048
assert config.token_guardrails.add_skill == 1024
21 changes: 21 additions & 0 deletions tests/server/test_error_scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,27 @@ async def test_add_resource_file_not_found(client: httpx.AsyncClient):
assert body["error"]["code"] == "PERMISSION_DENIED"


async def test_add_resource_token_guardrail_returns_structured_429(
client: httpx.AsyncClient,
service,
):
service.resources.set_token_guardrails(add_resource=8)

resp = await client.post(
"/api/v1/resources",
json={
"path": "https://example.com/" + ("long-segment-" * 40),
"reason": "guardrail test",
},
)

body = resp.json()
assert resp.status_code == 429
assert body["status"] == "error"
assert body["error"]["code"] == "RESOURCE_EXHAUSTED"
assert body["error"]["details"]["operation"] == "add_resource"


async def test_empty_body_on_post(client: httpx.AsyncClient):
"""POST with empty body should return 422."""
resp = await client.post(
Expand Down
Loading
Loading