Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 133 additions & 34 deletions app/src/ai/skills/file_watchers/skill_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
use std::collections::{HashMap, HashSet};
use std::fs;
use std::path::{Path, PathBuf};

use ai::skills::{
home_skills_path, parse_skill, ParsedSkill, SkillProvider, SKILL_PROVIDER_DEFINITIONS,
get_provider_for_path, home_skills_path, parse_skill, parse_skill_content_at_location,
ParsedSkill, SkillProvider, SkillScope, SKILL_PROVIDER_DEFINITIONS,
};
use async_channel::Sender;
use futures::future::BoxFuture;
use remote_server::proto::{
file_context_proto, FileContextProto, ReadFileContextFile, ReadFileContextRequest,
};
use repo_metadata::repositories::DetectedRepositories;
use repo_metadata::repository::{Repository, SubscriberId};
use repo_metadata::{DirectoryWatcher, RepoMetadataModel, RepositoryIdentifier, RepositoryUpdate};
Expand All @@ -20,6 +26,7 @@ use super::utils::{
is_home_skill_directory, is_skill_file, read_local_project_skills_from_filesystem,
read_skills_from_directories, read_skills_from_files,
};
use crate::remote_server::manager::RemoteServerManager;
use crate::warp_managed_paths_watcher::{
filter_repository_update_by_prefix, warp_managed_skill_dirs, WarpManagedPathsWatcher,
WarpManagedPathsWatcherEvent,
Expand All @@ -30,15 +37,20 @@ pub enum SkillWatcherEvent {
SkillsAdded { skills: Vec<ParsedSkill> },
SkillsDeleted { paths: Vec<LocalOrRemotePath> },
}

const REMOTE_SKILL_MAX_FILE_BYTES: u32 = 1024 * 1024;
const REMOTE_SKILL_MAX_BATCH_BYTES: u32 = 5 * 1024 * 1024;
type ProjectSkillContentsFuture =
BoxFuture<'static, anyhow::Result<Vec<(LocalOrRemotePath, String)>>>;
pub struct SkillWatcher {
// Channel for sending repository messages from subscribers.
repository_message_tx: Sender<SkillRepositoryMessage>,
/// Last known project skill files by repository. Project skill counts are small,
/// so repo metadata changes trigger a full refresh instead of a subtree diff.
project_skill_files_by_repo: HashMap<RepositoryIdentifier, HashSet<LocalOrRemotePath>>,
/// Latest full project-skill refresh generation by repository. Repo metadata refreshes
/// parse local files asynchronously, so results from superseded tree snapshots must
/// not re-add deleted skills or overwrite newer parsed content.
/// hydrate project skills asynchronously, so results from superseded tree snapshots
/// must not re-add deleted skills or overwrite newer parsed content.
project_skill_refresh_generations: HashMap<RepositoryIdentifier, u64>,
/// Allocates refresh generations that cannot be reused if a repository is removed
/// and subsequently re-added while an old task is still in flight.
Expand Down Expand Up @@ -237,29 +249,21 @@ impl SkillWatcher {

let deleted_paths = previous_skill_files
.difference(&current_skill_files)
.filter_map(|path| path.to_local_path().map(Path::to_path_buf))
.cloned()
.collect::<Vec<_>>();
if !deleted_paths.is_empty() {
self.cleanup_symlink_watches(&deleted_paths);
let deleted_local_paths = deleted_paths
.iter()
.filter_map(|path| path.to_local_path().map(Path::to_path_buf))
.collect::<Vec<_>>();
self.cleanup_symlink_watches(&deleted_local_paths);
let _ = self
.watcher_event_tx
.try_send(SkillWatcherEvent::SkillsDeleted {
paths: deleted_paths
.into_iter()
.map(LocalOrRemotePath::Local)
.collect(),
paths: deleted_paths,
});
}

// Local hydration intentionally only parses local paths. Remote project
// skill discovery now comes from RepoMetadataModel, but hydrating/parsing
// remote content is coming in a subsequent PR with a remote-aware skill
// identity pipeline.
let skill_files = current_skill_files
.iter()
.filter_map(|path| path.to_local_path().map(Path::to_path_buf))
.collect::<Vec<_>>();

// Project skill counts are expected to be small, so repo metadata updates
// intentionally trigger a full refresh instead of attempting to diff the
// changed subtree. This keeps local and remote project-skill behavior on
Expand All @@ -268,7 +272,7 @@ impl SkillWatcher {
self.spawn_read_project_skills_from_files(
repo_id.clone(),
refresh_generation,
skill_files,
current_skill_files.iter().cloned().collect(),
ctx,
);

Expand Down Expand Up @@ -367,18 +371,26 @@ impl SkillWatcher {
&mut self,
repo_id: RepositoryIdentifier,
refresh_generation: u64,
skill_files: impl IntoIterator<Item = PathBuf>,
skill_paths: Vec<LocalOrRemotePath>,
ctx: &mut ModelContext<Self>,
) {
let skill_files: Vec<_> = skill_files.into_iter().collect();
if skill_files.is_empty() {
if skill_paths.is_empty() {
return;
}
let Some(read_skill_contents) = read_project_skill_contents(skill_paths, ctx) else {
return;
};

ctx.spawn(
async move { read_skills_from_files(skill_files) },
move |me, skills, ctx| {
me.emit_project_skills_if_current(&repo_id, refresh_generation, skills, ctx);
async move {
let skill_contents = read_skill_contents.await?;
Ok::<Vec<ParsedSkill>, anyhow::Error>(parse_project_skill_contents(skill_contents))
},
move |me, skills, ctx| match skills {
Ok(skills) => {
me.emit_project_skills_if_current(&repo_id, refresh_generation, skills, ctx);
}
Err(err) => log::warn!("Failed to read project skills: {err}"),
},
);
}
Expand Down Expand Up @@ -432,19 +444,17 @@ impl SkillWatcher {
let Some(skill_files) = self.project_skill_files_by_repo.remove(repo_id) else {
return;
};
let deleted_paths = skill_files
.into_iter()
.filter_map(|path| path.to_local_path().map(Path::to_path_buf))
.collect::<Vec<_>>();
let deleted_paths = skill_files.into_iter().collect::<Vec<_>>();
if !deleted_paths.is_empty() {
self.cleanup_symlink_watches(&deleted_paths);
let deleted_local_paths = deleted_paths
.iter()
.filter_map(|path| path.to_local_path().map(Path::to_path_buf))
.collect::<Vec<_>>();
self.cleanup_symlink_watches(&deleted_local_paths);
let _ = self
.watcher_event_tx
.try_send(SkillWatcherEvent::SkillsDeleted {
paths: deleted_paths
.into_iter()
.map(LocalOrRemotePath::Local)
.collect(),
paths: deleted_paths,
});
}
}
Expand Down Expand Up @@ -1044,6 +1054,95 @@ impl SkillWatcher {
}
}

fn read_project_skill_contents(
skill_paths: Vec<LocalOrRemotePath>,
ctx: &AppContext,
) -> Option<ProjectSkillContentsFuture> {
match skill_paths.first()? {
LocalOrRemotePath::Local(_) => Some(Box::pin(async move {
Ok(read_local_project_skill_contents(skill_paths))
})),
LocalOrRemotePath::Remote(remote) => {
let client = RemoteServerManager::as_ref(ctx)
.client_for_host(&remote.host_id)?
.clone();
Some(Box::pin(async move {
let request = remote_skill_read_request(&skill_paths);
let response = client.read_file_context(request).await?;
Ok(read_remote_project_skill_contents(
skill_paths,
response.file_contexts,
))
}))
}
}
}
fn remote_skill_read_request(skill_paths: &[LocalOrRemotePath]) -> ReadFileContextRequest {
ReadFileContextRequest {
files: skill_paths
.iter()
.filter_map(|path| match path {
LocalOrRemotePath::Remote(remote) => Some(ReadFileContextFile {
path: remote.path.as_str().to_string(),
line_ranges: Vec::new(),
}),
LocalOrRemotePath::Local(_) => None,
})
.collect(),
max_file_bytes: Some(REMOTE_SKILL_MAX_FILE_BYTES),
max_batch_bytes: Some(REMOTE_SKILL_MAX_BATCH_BYTES),
}
}

fn read_local_project_skill_contents(
skill_paths: Vec<LocalOrRemotePath>,
) -> Vec<(LocalOrRemotePath, String)> {
skill_paths
.into_iter()
.filter_map(|path| {
let content = fs::read_to_string(path.to_local_path()?).ok()?;
Some((path, content))
})
.collect()
}

fn read_remote_project_skill_contents(
skill_paths: Vec<LocalOrRemotePath>,
file_contexts: Vec<FileContextProto>,
) -> Vec<(LocalOrRemotePath, String)> {
let text_content_by_path = file_contexts
.into_iter()
.filter_map(|file_context| {
let file_context_proto::Content::TextContent(content) = file_context.content? else {
return None;
};
Some((file_context.file_name, content))
})
.collect::<HashMap<_, _>>();

skill_paths
.into_iter()
.filter_map(|path| {
let LocalOrRemotePath::Remote(remote) = &path else {
return None;
};
let content = text_content_by_path.get(remote.path.as_str())?.clone();
Some((path, content))
})
.collect()
}

fn parse_project_skill_contents(
skill_contents: Vec<(LocalOrRemotePath, String)>,
) -> Vec<ParsedSkill> {
skill_contents
.into_iter()
.filter_map(|(path, content)| {
let provider = get_provider_for_path(&path).unwrap_or(SkillProvider::Agents);
parse_skill_content_at_location(path, &content, provider, SkillScope::Project).ok()
})
.collect()
}
impl Entity for SkillWatcher {
type Event = SkillWatcherEvent;
}
Expand Down
Loading