From 051483a579cd20872e09e82cdae1670be02c026e Mon Sep 17 00:00:00 2001 From: seemeroland Date: Wed, 20 May 2026 21:40:46 +0000 Subject: [PATCH] REMOTE-1752 Show cloud orchestration pills for child runs Co-Authored-By: Oz --- .../viewer/orchestration_viewer_model.rs | 166 ++++++++++++++---- .../orchestration_viewer_model_tests.rs | 95 ++++++++++ .../shared_session/viewer/terminal_manager.rs | 11 ++ 3 files changed, 241 insertions(+), 31 deletions(-) diff --git a/app/src/terminal/shared_session/viewer/orchestration_viewer_model.rs b/app/src/terminal/shared_session/viewer/orchestration_viewer_model.rs index 36addf1260..0d58fe0517 100644 --- a/app/src/terminal/shared_session/viewer/orchestration_viewer_model.rs +++ b/app/src/terminal/shared_session/viewer/orchestration_viewer_model.rs @@ -59,11 +59,24 @@ struct ChildAgentEntry { pane_materialization_requested: bool, } +struct ChildConversationUpdate { + task_id: AmbientAgentTaskId, + name: String, + fallback_title: String, + harness: Option, + parent_conversation_id: AIConversationId, + status: ConversationStatus, +} + /// Owns child discovery + status polling for a shared session viewer of an /// orchestrated session. pub struct OrchestrationViewerModel { /// Orchestrator run id; used as the `ancestor_run_id` fetch filter. parent_task_id: AmbientAgentTaskId, + /// Run id for the shared session the user actually opened. This differs + /// from `parent_task_id` when opening a child cloud orchestration agent + /// directly. + active_task_id: AmbientAgentTaskId, /// Owns the child conversations and anchors the orchestrator lookup. terminal_view_id: EntityId, /// Used to emit `EnsureSharedSessionViewerChildPane` on the parent's @@ -88,6 +101,7 @@ impl OrchestrationViewerModel { /// Kicks off the initial children fetch and schedules the first poll. pub fn new( parent_task_id: AmbientAgentTaskId, + active_task_id: AmbientAgentTaskId, terminal_view_id: EntityId, terminal_view: WeakViewHandle, ctx: &mut ModelContext, @@ -101,6 +115,7 @@ impl OrchestrationViewerModel { let mut model = Self { parent_task_id, + active_task_id, terminal_view_id, terminal_view, children: HashMap::new(), @@ -239,14 +254,26 @@ impl OrchestrationViewerModel { // `&mut self.children` borrow. let mut to_materialize: Vec<(AIConversationId, SessionId)> = Vec::new(); + let mut parent_task = None; + let mut child_tasks = Vec::new(); for task in tasks { // `ancestor_run_id` returns every descendant. Trust it for // membership (locally-spawned children may have empty or // sibling `parent_run_id`s), only skipping the parent itself. if task.task_id == self.parent_task_id { + parent_task = Some(task); continue; } + child_tasks.push(task); + } + + let Some(parent_conversation_id) = + self.ensure_parent_conversation(parent_task.as_ref(), ctx) + else { + return; + }; + for task in child_tasks { let task_id = task.task_id; let session_id = task .session_id @@ -290,13 +317,6 @@ impl OrchestrationViewerModel { continue; } - // New child: register under the orchestrator's local - // conversation. Without it, `start_new_child_conversation` - // would lose the parent linkage. Retry on the next poll. - let Some(parent_conversation_id) = self.find_parent_conversation_id(ctx) else { - continue; - }; - let name = task.display_name().to_string(); // Trim to stay in sync with `display_name()`, which also trims; // the descriptive title flows through `set_fallback_display_title` @@ -307,34 +327,19 @@ impl OrchestrationViewerModel { .as_ref() .and_then(|c| c.harness.as_ref()) .map(|h| h.harness_type); - let terminal_view_id = self.terminal_view_id; let status_for_initial = conversation_status.clone(); - let conversation_id = history_handle.update(ctx, |history, ctx| { - let conversation_id = history.start_new_child_conversation( - terminal_view_id, + let conversation_id = self.register_or_update_child_conversation( + ChildConversationUpdate { + task_id, name, - parent_conversation_id, + fallback_title, harness, - ctx, - ); - // Suppress server-side status reporting (viewer-side); also - // disambiguates viewer-spawned children downstream. - history.set_viewing_shared_session_for_conversation(conversation_id, true); - if let Some(conversation) = history.conversation_mut(&conversation_id) { - conversation.set_task_id(task_id); - if !fallback_title.is_empty() { - conversation.set_fallback_display_title(fallback_title); - } - } - history.update_conversation_status( - terminal_view_id, - conversation_id, - status_for_initial, - ctx, - ); - conversation_id - }); + parent_conversation_id, + status: status_for_initial, + }, + ctx, + ); let pane_materialization_requested = session_id.is_some(); if let Some(sid) = session_id { @@ -357,6 +362,105 @@ impl OrchestrationViewerModel { } } + fn ensure_parent_conversation( + &self, + parent_task: Option<&AmbientAgentTask>, + ctx: &mut ModelContext, + ) -> Option { + let history_handle = BlocklistAIHistoryModel::handle(ctx); + history_handle.update(ctx, |history, ctx| { + if self.active_task_id == self.parent_task_id { + let parent_conversation_id = + history.active_conversation_id(self.terminal_view_id)?; + if let Some(conversation) = history.conversation_mut(&parent_conversation_id) { + conversation.set_task_id(self.parent_task_id); + if conversation.agent_name().is_none() { + let name = parent_task + .map(|task| task.display_name().to_string()) + .unwrap_or_else(|| "Orchestrator".to_string()); + conversation.set_agent_name(name); + } + } + return Some(parent_conversation_id); + } + + if let Some(parent_conversation_id) = + history.conversation_id_for_agent_id(&self.parent_task_id.to_string()) + { + return Some(parent_conversation_id); + } + + let parent_conversation_id = + history.start_new_conversation(self.terminal_view_id, false, true, false, ctx); + history.assign_run_id_for_conversation( + parent_conversation_id, + self.parent_task_id.to_string(), + Some(self.parent_task_id), + self.terminal_view_id, + ctx, + ); + if let Some(conversation) = history.conversation_mut(&parent_conversation_id) { + let name = parent_task + .map(|task| task.display_name().to_string()) + .unwrap_or_else(|| "Orchestrator".to_string()); + conversation.set_agent_name(name); + if let Some(parent_task) = parent_task { + let fallback_title = parent_task.title.trim().to_string(); + if !fallback_title.is_empty() { + conversation.set_fallback_display_title(fallback_title); + } + } + } + Some(parent_conversation_id) + }) + } + + fn register_or_update_child_conversation( + &self, + update: ChildConversationUpdate, + ctx: &mut ModelContext, + ) -> AIConversationId { + let history_handle = BlocklistAIHistoryModel::handle(ctx); + history_handle.update(ctx, |history, ctx| { + let conversation_id = history + .conversation_id_for_agent_id(&update.task_id.to_string()) + .or_else(|| { + (update.task_id == self.active_task_id) + .then(|| history.active_conversation_id(self.terminal_view_id)) + .flatten() + }) + .unwrap_or_else(|| { + history.start_new_child_conversation( + self.terminal_view_id, + update.name.clone(), + update.parent_conversation_id, + update.harness, + ctx, + ) + }); + + history.set_parent_for_conversation(conversation_id, update.parent_conversation_id); + history.set_viewing_shared_session_for_conversation(conversation_id, true); + if let Some(conversation) = history.conversation_mut(&conversation_id) { + conversation.set_task_id(update.task_id); + conversation.set_agent_name(update.name); + if let Some(harness) = update.harness { + conversation.set_orchestration_harness(harness); + } + if !update.fallback_title.is_empty() { + conversation.set_fallback_display_title(update.fallback_title); + } + } + history.update_conversation_status( + self.terminal_view_id, + conversation_id, + update.status, + ctx, + ); + conversation_id + }) + } + /// Resolves the orchestrator's local conversation id via the view's /// active conversation, which `on_shared_init` sets on first join. fn find_parent_conversation_id(&self, ctx: &ModelContext) -> Option { diff --git a/app/src/terminal/shared_session/viewer/orchestration_viewer_model_tests.rs b/app/src/terminal/shared_session/viewer/orchestration_viewer_model_tests.rs index d0f8b648fa..ab909bfcc1 100644 --- a/app/src/terminal/shared_session/viewer/orchestration_viewer_model_tests.rs +++ b/app/src/terminal/shared_session/viewer/orchestration_viewer_model_tests.rs @@ -17,6 +17,7 @@ use chrono::Utc; use warpui::{App, EntityId, SingletonEntity}; use crate::ai::ambient_agents::task::{AgentConfigSnapshot, AmbientAgentTask}; +use crate::features::FeatureFlag; use crate::test_util::{add_window_with_terminal, terminal::initialize_app_for_terminal_view}; // ---- Pure-function tests ---------------------------------------------------- @@ -164,6 +165,7 @@ fn setup_model( let model = OrchestrationViewerModel { parent_task_id, + active_task_id: parent_task_id, terminal_view_id, terminal_view: terminal_view.downgrade(), children: HashMap::new(), @@ -229,6 +231,98 @@ fn registers_new_child_conversation() { }); } +#[test] +fn direct_child_viewer_anchors_pill_tree_on_parent_run() { + let _orchestration_v2 = FeatureFlag::OrchestrationV2.override_enabled(true); + + App::test((), |mut app| async move { + let parent = task_id(PARENT_TASK_ID); + let active_child = task_id(CHILD_A_TASK_ID); + + initialize_app_for_terminal_view(&mut app); + let terminal_view = add_window_with_terminal(&mut app, None); + let terminal_view_id = terminal_view.id(); + let active_child_conversation_id = + BlocklistAIHistoryModel::handle(&app).update(&mut app, |history, ctx| { + let id = history.start_new_conversation(terminal_view_id, false, true, false, ctx); + history.set_active_conversation_id(id, terminal_view_id, ctx); + if let Some(conversation) = history.conversation_mut(&id) { + conversation.set_task_id(active_child); + conversation.set_agent_name("Opened child".to_string()); + } + id + }); + + let model = OrchestrationViewerModel { + parent_task_id: parent, + active_task_id: active_child, + terminal_view_id, + terminal_view: terminal_view.downgrade(), + children: HashMap::new(), + polling_handle: None, + fetch_generation: 0, + }; + let model_handle = app.add_model(|_| model); + + model_handle.update(&mut app, |model, ctx| { + model.apply_children_fetch( + vec![ + make_task( + PARENT_TASK_ID, + AmbientAgentTaskState::InProgress, + "Coordinator", + None, + ), + make_task( + CHILD_A_TASK_ID, + AmbientAgentTaskState::InProgress, + "Opened child", + Some(SESSION_A), + ), + make_task( + CHILD_B_TASK_ID, + AmbientAgentTaskState::Queued, + "Sibling child", + None, + ), + ], + ctx, + ); + }); + + let history = BlocklistAIHistoryModel::handle(&app); + history.read(&app, |history, _| { + let parent_conversation_id = history + .conversation_id_for_agent_id(PARENT_TASK_ID) + .expect("parent run should get a local orchestrator conversation"); + assert_eq!( + history.active_conversation_id(terminal_view_id), + Some(active_child_conversation_id), + "opening a child run should keep that child selected" + ); + + let child_ids = history.child_conversation_ids_of(&parent_conversation_id); + assert_eq!( + child_ids.len(), + 2, + "parent should own opened child and sibling" + ); + assert!( + child_ids.contains(&active_child_conversation_id), + "opened child conversation should be reused in the pill tree" + ); + let active_child_conversation = history + .conversation(&active_child_conversation_id) + .expect("opened child conversation exists"); + assert_eq!( + active_child_conversation.parent_conversation_id(), + Some(parent_conversation_id) + ); + assert_eq!(active_child_conversation.agent_name(), Some("Opened child")); + }); + }); +} + #[test] fn skips_parent_task_id_as_child() { App::test((), |mut app| async move { @@ -280,6 +374,7 @@ fn skips_child_when_no_active_parent_conversation() { // registration should be deferred to the next poll. let model = OrchestrationViewerModel { parent_task_id: task_id(PARENT_TASK_ID), + active_task_id: task_id(PARENT_TASK_ID), terminal_view_id, terminal_view: terminal_view.downgrade(), children: HashMap::new(), diff --git a/app/src/terminal/shared_session/viewer/terminal_manager.rs b/app/src/terminal/shared_session/viewer/terminal_manager.rs index f70963d1fb..303feab7dd 100644 --- a/app/src/terminal/shared_session/viewer/terminal_manager.rs +++ b/app/src/terminal/shared_session/viewer/terminal_manager.rs @@ -20,6 +20,7 @@ use warpui::{ use crate::ai::active_agent_views_model::ActiveAgentViewsModel; use crate::ai::agent::conversation::ConversationStatus; +use crate::ai::agent_conversations_model::AgentConversationsModel; use crate::ai::blocklist::agent_view::{AgentViewController, AgentViewControllerEvent}; use crate::ai::blocklist::{ BlocklistAIContextEvent, BlocklistAIContextModel, BlocklistAIHistoryEvent, @@ -801,11 +802,21 @@ impl TerminalManager { && FeatureFlag::OrchestrationViewerPillBar.is_enabled() && orchestration_viewer_model.lock().is_none() { + let orchestration_root_task_id = AgentConversationsModel::as_ref(ctx) + .get_task_data(&task_id) + .and_then(|task| { + task.parent_run_id + .as_deref() + .filter(|id| !id.is_empty()) + .and_then(|id| id.parse().ok()) + }) + .unwrap_or(task_id); let weak_view_handle_for_orch = weak_view_handle.clone(); let orchestration_viewer_model_slot = orchestration_viewer_model.clone(); let handle = ctx.add_model(|model_ctx| { OrchestrationViewerModel::new( + orchestration_root_task_id, task_id, terminal_view_id, weak_view_handle_for_orch,