Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 82 additions & 88 deletions openviking/storage/viking_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from openviking.core.namespace import (
is_accessible as namespace_is_accessible,
)
from openviking.pyagfs.exceptions import AGFSClientError, AGFSDirectoryNotEmptyError, AGFSHTTPError
from openviking.pyagfs.exceptions import AGFSDirectoryNotEmptyError
from openviking.resource.watch_storage import is_watch_task_control_uri
from openviking.server.error_mapping import is_not_found_error, map_exception
from openviking.server.identity import RequestContext, Role
Expand Down Expand Up @@ -321,49 +321,79 @@ def _ensure_mutable_access(self, uri: str, ctx: Optional[RequestContext]) -> Non
resource=normalized_uri,
)

# ========== AGFS Basic Commands ==========
def _agfs_read(self, path: str, offset: int = 0, size: int = -1) -> Any:
"""Call AGFS read while tolerating backends that omit offset/size params."""
if offset == 0 and size == -1:
return self.agfs.read(path)
try:
return self.agfs.read(path, offset, size)
except TypeError:
return self.agfs.read(path)

async def read(
async def _read_path_bytes(
self,
uri: str,
path: str,
*,
ctx: Optional[RequestContext] = None,
offset: int = 0,
size: int = -1,
ctx: Optional[RequestContext] = None,
decrypt: bool = False,
require_exists: bool = False,
) -> bytes:
"""Read file"""
self._ensure_access(uri, ctx)
path = self._uri_to_path(uri, ctx=ctx)
"""Read bytes from AGFS with optional full-file decrypt and plaintext slicing."""
if require_exists:
self.agfs.stat(path)

if self._encryptor:
# When encryption is enabled: must read entire file for decryption
result = self.agfs.read(path, 0, -1)
if isinstance(result, bytes):
raw = result
elif result is not None and hasattr(result, "content"):
raw = result.content
else:
raw = b""
read_offset = 0 if decrypt and self._encryptor else offset
read_size = -1 if decrypt and self._encryptor else size
raw = self._handle_agfs_read(self._agfs_read(path, read_offset, read_size))

if decrypt:
raw = await self._decrypt_content(raw, ctx=ctx)

# Apply slicing on decrypted plaintext
if offset > 0 or size != -1:
if size != -1:
raw = raw[offset : offset + size]
else:
raw = raw[offset:]
else:
# When not encrypted: normal read
result = self.agfs.read(path, offset, size)
if isinstance(result, bytes):
raw = result
elif result is not None and hasattr(result, "content"):
raw = result.content
else:
raw = b""

return raw

async def _write_path_bytes(
self,
path: str,
content: bytes,
*,
ctx: Optional[RequestContext] = None,
encrypt: bool = False,
ensure_parent: bool = False,
) -> Any:
"""Write bytes to AGFS with optional parent creation and encryption."""
if ensure_parent:
await self._ensure_parent_dirs(path)
if encrypt:
content = await self._encrypt_content(content, ctx=ctx)
return self.agfs.write(path, content)

# ========== AGFS Basic Commands ==========

async def read(
self,
uri: str,
offset: int = 0,
size: int = -1,
ctx: Optional[RequestContext] = None,
) -> bytes:
"""Read file"""
self._ensure_access(uri, ctx)
path = self._uri_to_path(uri, ctx=ctx)
return await self._read_path_bytes(
path,
ctx=ctx,
offset=offset,
size=size,
decrypt=True,
)

async def write(
self,
uri: str,
Expand All @@ -375,9 +405,7 @@ async def write(
path = self._uri_to_path(uri, ctx=ctx)
if isinstance(data, str):
data = data.encode("utf-8")

data = await self._encrypt_content(data, ctx=ctx)
return self.agfs.write(path, data)
return await self._write_path_bytes(path, data, ctx=ctx, encrypt=True)

async def mkdir(
self,
Expand Down Expand Up @@ -922,7 +950,7 @@ async def abstract(
)
file_path = f"{path}/.abstract.md"
try:
content_bytes = self._handle_agfs_read(self.agfs.read(file_path))
content_bytes = await self._read_path_bytes(file_path, ctx=ctx, decrypt=True)
except Exception as exc:
if not is_not_found_error(exc):
mapped = map_exception(exc, resource=uri)
Expand All @@ -932,10 +960,6 @@ async def abstract(
# Fallback to default if .abstract.md doesn't exist
return f"# {uri}\n\n[Directory abstract is not ready]"

if self._encryptor:
real_ctx = self._ctx_or_default(ctx)
content_bytes = await self._encryptor.decrypt(real_ctx.account_id, content_bytes)

return self._decode_bytes(content_bytes)

async def overview(
Expand All @@ -960,7 +984,7 @@ async def overview(
)
file_path = f"{path}/.overview.md"
try:
content_bytes = self._handle_agfs_read(self.agfs.read(file_path))
content_bytes = await self._read_path_bytes(file_path, ctx=ctx, decrypt=True)
except Exception as exc:
if not is_not_found_error(exc):
mapped = map_exception(exc, resource=uri)
Expand All @@ -970,10 +994,6 @@ async def overview(
# Fallback to default if .overview.md doesn't exist
return f"# {uri}\n\n[Directory overview is not ready]"

if self._encryptor:
real_ctx = self._ctx_or_default(ctx)
content_bytes = await self._encryptor.decrypt(real_ctx.account_id, content_bytes)

return self._decode_bytes(content_bytes)

async def relations(
Expand Down Expand Up @@ -1713,9 +1733,7 @@ async def _write_relation_table(
table_path = f"{dir_path}/.relations.json"
if isinstance(content, str):
content = content.encode("utf-8")

content = await self._encrypt_content(content, ctx=ctx)
self.agfs.write(table_path, content)
await self._write_path_bytes(table_path, content, ctx=ctx, encrypt=True)

# ========== Batch Read (backward compatible) ==========

Expand Down Expand Up @@ -1747,13 +1765,10 @@ async def write_file(
"""Write file directly."""
self._ensure_access(uri, ctx)
path = self._uri_to_path(uri, ctx=ctx)
await self._ensure_parent_dirs(path)

if isinstance(content, str):
content = content.encode("utf-8")

content = await self._encrypt_content(content, ctx=ctx)
self.agfs.write(path, content)
await self._write_path_bytes(path, content, ctx=ctx, encrypt=True, ensure_parent=True)

async def read_file(
self,
Expand All @@ -1774,29 +1789,13 @@ async def read_file(
"""
self._ensure_access(uri, ctx)
path = self._uri_to_path(uri, ctx=ctx)
# Verify the file exists before reading, because AGFS read returns
# empty bytes for non-existent files instead of raising an error.
try:
self.agfs.stat(path)
except Exception:
raise NotFoundError(uri, "file")
try:
content = self.agfs.read(path)
if isinstance(content, bytes):
raw = content
elif content is not None and hasattr(content, "content"):
raw = content.content
else:
raw = b""

# If encryption is enabled, always decrypt full file first
if self._encryptor:
raw = await self._decrypt_content(raw, ctx=ctx)

text = self._decode_bytes(raw)
except Exception:
raw = await self._read_path_bytes(path, ctx=ctx, decrypt=True, require_exists=True)
except Exception as exc:
if not is_not_found_error(exc):
raise
raise NotFoundError(uri, "file")

text = self._decode_bytes(raw)
if offset == 0 and limit == -1:
return text
lines = text.splitlines(keepends=True)
Expand All @@ -1812,10 +1811,10 @@ async def read_file_bytes(
self._ensure_access(uri, ctx)
path = self._uri_to_path(uri, ctx=ctx)
try:
raw = self._handle_agfs_read(self.agfs.read(path))
raw = await self._decrypt_content(raw, ctx=ctx)
return raw
except Exception:
return await self._read_path_bytes(path, ctx=ctx, decrypt=True, require_exists=True)
except Exception as exc:
if not is_not_found_error(exc):
raise
raise NotFoundError(uri, "file")

async def write_file_bytes(
Expand All @@ -1827,10 +1826,7 @@ async def write_file_bytes(
"""Write single binary file."""
self._ensure_access(uri, ctx)
path = self._uri_to_path(uri, ctx=ctx)
await self._ensure_parent_dirs(path)

content = await self._encrypt_content(content, ctx=ctx)
self.agfs.write(path, content)
await self._write_path_bytes(path, content, ctx=ctx, encrypt=True, ensure_parent=True)

async def append_file(
self,
Expand All @@ -1845,21 +1841,19 @@ async def append_file(
try:
existing = ""
try:
existing_bytes = self._handle_agfs_read(self.agfs.read(path))
existing_bytes = await self._decrypt_content(existing_bytes, ctx=ctx)
existing_bytes = await self._read_path_bytes(
path, ctx=ctx, decrypt=True, require_exists=True
)
existing = self._decode_bytes(existing_bytes)
except FileNotFoundError:
pass
except AGFSHTTPError as e:
if e.status_code != 404:
except Exception as exc:
if not is_not_found_error(exc):
raise
except AGFSClientError:
raise
pass

await self._ensure_parent_dirs(path)
final_content = (existing + content).encode("utf-8")
final_content = await self._encrypt_content(final_content, ctx=ctx)
self.agfs.write(path, final_content)
await self._write_path_bytes(
path, final_content, ctx=ctx, encrypt=True, ensure_parent=True
)

except Exception as e:
logger.error(f"[VikingFS] Failed to append to file {uri}: {e}")
Expand Down
Loading
Loading