Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 135 additions & 31 deletions app/src/terminal/shared_session/viewer/orchestration_viewer_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,24 @@ struct ChildAgentEntry {
pane_materialization_requested: bool,
}

struct ChildConversationUpdate {
task_id: AmbientAgentTaskId,
name: String,
fallback_title: String,
harness: Option<warp_cli::agent::Harness>,
parent_conversation_id: AIConversationId,
status: ConversationStatus,
}

/// Owns child discovery + status polling for a shared session viewer of an
/// orchestrated session.
pub struct OrchestrationViewerModel {
/// Orchestrator run id; used as the `ancestor_run_id` fetch filter.
parent_task_id: AmbientAgentTaskId,
/// Run id for the shared session the user actually opened. This differs
/// from `parent_task_id` when opening a child cloud orchestration agent
/// directly.
active_task_id: AmbientAgentTaskId,
/// Owns the child conversations and anchors the orchestrator lookup.
terminal_view_id: EntityId,
/// Used to emit `EnsureSharedSessionViewerChildPane` on the parent's
Expand All @@ -88,6 +101,7 @@ impl OrchestrationViewerModel {
/// Kicks off the initial children fetch and schedules the first poll.
pub fn new(
parent_task_id: AmbientAgentTaskId,
active_task_id: AmbientAgentTaskId,
terminal_view_id: EntityId,
terminal_view: WeakViewHandle<TerminalView>,
ctx: &mut ModelContext<Self>,
Expand All @@ -101,6 +115,7 @@ impl OrchestrationViewerModel {

let mut model = Self {
parent_task_id,
active_task_id,
terminal_view_id,
terminal_view,
children: HashMap::new(),
Expand Down Expand Up @@ -239,14 +254,26 @@ impl OrchestrationViewerModel {
// `&mut self.children` borrow.
let mut to_materialize: Vec<(AIConversationId, SessionId)> = Vec::new();

let mut parent_task = None;
let mut child_tasks = Vec::new();
for task in tasks {
// `ancestor_run_id` returns every descendant. Trust it for
// membership (locally-spawned children may have empty or
// sibling `parent_run_id`s), only skipping the parent itself.
if task.task_id == self.parent_task_id {
parent_task = Some(task);
continue;
}
child_tasks.push(task);
}

let Some(parent_conversation_id) =
self.ensure_parent_conversation(parent_task.as_ref(), ctx)
else {
return;
};

for task in child_tasks {
let task_id = task.task_id;
let session_id = task
.session_id
Expand Down Expand Up @@ -290,13 +317,6 @@ impl OrchestrationViewerModel {
continue;
}

// New child: register under the orchestrator's local
// conversation. Without it, `start_new_child_conversation`
// would lose the parent linkage. Retry on the next poll.
let Some(parent_conversation_id) = self.find_parent_conversation_id(ctx) else {
continue;
};

let name = task.display_name().to_string();
// Trim to stay in sync with `display_name()`, which also trims;
// the descriptive title flows through `set_fallback_display_title`
Expand All @@ -307,34 +327,19 @@ impl OrchestrationViewerModel {
.as_ref()
.and_then(|c| c.harness.as_ref())
.map(|h| h.harness_type);
let terminal_view_id = self.terminal_view_id;
let status_for_initial = conversation_status.clone();

let conversation_id = history_handle.update(ctx, |history, ctx| {
let conversation_id = history.start_new_child_conversation(
terminal_view_id,
let conversation_id = self.register_or_update_child_conversation(
ChildConversationUpdate {
task_id,
name,
parent_conversation_id,
fallback_title,
harness,
ctx,
);
// Suppress server-side status reporting (viewer-side); also
// disambiguates viewer-spawned children downstream.
history.set_viewing_shared_session_for_conversation(conversation_id, true);
if let Some(conversation) = history.conversation_mut(&conversation_id) {
conversation.set_task_id(task_id);
if !fallback_title.is_empty() {
conversation.set_fallback_display_title(fallback_title);
}
}
history.update_conversation_status(
terminal_view_id,
conversation_id,
status_for_initial,
ctx,
);
conversation_id
});
parent_conversation_id,
status: status_for_initial,
},
ctx,
);

let pane_materialization_requested = session_id.is_some();
if let Some(sid) = session_id {
Expand All @@ -357,6 +362,105 @@ impl OrchestrationViewerModel {
}
}

fn ensure_parent_conversation(
&self,
parent_task: Option<&AmbientAgentTask>,
ctx: &mut ModelContext<Self>,
) -> Option<AIConversationId> {
let history_handle = BlocklistAIHistoryModel::handle(ctx);
history_handle.update(ctx, |history, ctx| {
if self.active_task_id == self.parent_task_id {
let parent_conversation_id =
history.active_conversation_id(self.terminal_view_id)?;
if let Some(conversation) = history.conversation_mut(&parent_conversation_id) {
conversation.set_task_id(self.parent_task_id);
if conversation.agent_name().is_none() {
let name = parent_task
.map(|task| task.display_name().to_string())
.unwrap_or_else(|| "Orchestrator".to_string());
conversation.set_agent_name(name);
}
}
return Some(parent_conversation_id);
}

if let Some(parent_conversation_id) =
history.conversation_id_for_agent_id(&self.parent_task_id.to_string())
{
return Some(parent_conversation_id);
}

let parent_conversation_id =
history.start_new_conversation(self.terminal_view_id, false, true, false, ctx);
history.assign_run_id_for_conversation(
parent_conversation_id,
self.parent_task_id.to_string(),
Some(self.parent_task_id),
self.terminal_view_id,
ctx,
);
if let Some(conversation) = history.conversation_mut(&parent_conversation_id) {
let name = parent_task
.map(|task| task.display_name().to_string())
.unwrap_or_else(|| "Orchestrator".to_string());
conversation.set_agent_name(name);
if let Some(parent_task) = parent_task {
let fallback_title = parent_task.title.trim().to_string();
if !fallback_title.is_empty() {
conversation.set_fallback_display_title(fallback_title);
}
}
}
Some(parent_conversation_id)
})
}

fn register_or_update_child_conversation(
&self,
update: ChildConversationUpdate,
ctx: &mut ModelContext<Self>,
) -> AIConversationId {
let history_handle = BlocklistAIHistoryModel::handle(ctx);
history_handle.update(ctx, |history, ctx| {
let conversation_id = history
.conversation_id_for_agent_id(&update.task_id.to_string())
.or_else(|| {
(update.task_id == self.active_task_id)
.then(|| history.active_conversation_id(self.terminal_view_id))
.flatten()
})
.unwrap_or_else(|| {
history.start_new_child_conversation(
self.terminal_view_id,
update.name.clone(),
update.parent_conversation_id,
update.harness,
ctx,
)
});

history.set_parent_for_conversation(conversation_id, update.parent_conversation_id);
history.set_viewing_shared_session_for_conversation(conversation_id, true);
if let Some(conversation) = history.conversation_mut(&conversation_id) {
conversation.set_task_id(update.task_id);
conversation.set_agent_name(update.name);
if let Some(harness) = update.harness {
conversation.set_orchestration_harness(harness);
}
if !update.fallback_title.is_empty() {
conversation.set_fallback_display_title(update.fallback_title);
}
}
history.update_conversation_status(
self.terminal_view_id,
conversation_id,
update.status,
ctx,
);
conversation_id
})
}

/// Resolves the orchestrator's local conversation id via the view's
/// active conversation, which `on_shared_init` sets on first join.
fn find_parent_conversation_id(&self, ctx: &ModelContext<Self>) -> Option<AIConversationId> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use chrono::Utc;
use warpui::{App, EntityId, SingletonEntity};

use crate::ai::ambient_agents::task::{AgentConfigSnapshot, AmbientAgentTask};
use crate::features::FeatureFlag;
use crate::test_util::{add_window_with_terminal, terminal::initialize_app_for_terminal_view};

// ---- Pure-function tests ----------------------------------------------------
Expand Down Expand Up @@ -164,6 +165,7 @@ fn setup_model(

let model = OrchestrationViewerModel {
parent_task_id,
active_task_id: parent_task_id,
terminal_view_id,
terminal_view: terminal_view.downgrade(),
children: HashMap::new(),
Expand Down Expand Up @@ -229,6 +231,98 @@ fn registers_new_child_conversation() {
});
}

#[test]
fn direct_child_viewer_anchors_pill_tree_on_parent_run() {
let _orchestration_v2 = FeatureFlag::OrchestrationV2.override_enabled(true);

App::test((), |mut app| async move {
let parent = task_id(PARENT_TASK_ID);
let active_child = task_id(CHILD_A_TASK_ID);

initialize_app_for_terminal_view(&mut app);
let terminal_view = add_window_with_terminal(&mut app, None);
let terminal_view_id = terminal_view.id();
let active_child_conversation_id =
BlocklistAIHistoryModel::handle(&app).update(&mut app, |history, ctx| {
let id = history.start_new_conversation(terminal_view_id, false, true, false, ctx);
history.set_active_conversation_id(id, terminal_view_id, ctx);
if let Some(conversation) = history.conversation_mut(&id) {
conversation.set_task_id(active_child);
conversation.set_agent_name("Opened child".to_string());
}
id
});

let model = OrchestrationViewerModel {
parent_task_id: parent,
active_task_id: active_child,
terminal_view_id,
terminal_view: terminal_view.downgrade(),
children: HashMap::new(),
polling_handle: None,
fetch_generation: 0,
};
let model_handle = app.add_model(|_| model);

model_handle.update(&mut app, |model, ctx| {
model.apply_children_fetch(
vec![
make_task(
PARENT_TASK_ID,
AmbientAgentTaskState::InProgress,
"Coordinator",
None,
),
make_task(
CHILD_A_TASK_ID,
AmbientAgentTaskState::InProgress,
"Opened child",
Some(SESSION_A),
),
make_task(
CHILD_B_TASK_ID,
AmbientAgentTaskState::Queued,
"Sibling child",
None,
),
],
ctx,
);
});

let history = BlocklistAIHistoryModel::handle(&app);
history.read(&app, |history, _| {
let parent_conversation_id = history
.conversation_id_for_agent_id(PARENT_TASK_ID)
.expect("parent run should get a local orchestrator conversation");
assert_eq!(
history.active_conversation_id(terminal_view_id),
Some(active_child_conversation_id),
"opening a child run should keep that child selected"
);

let child_ids = history.child_conversation_ids_of(&parent_conversation_id);
assert_eq!(
child_ids.len(),
2,
"parent should own opened child and sibling"
);
assert!(
child_ids.contains(&active_child_conversation_id),
"opened child conversation should be reused in the pill tree"
);
let active_child_conversation = history
.conversation(&active_child_conversation_id)
.expect("opened child conversation exists");
assert_eq!(
active_child_conversation.parent_conversation_id(),
Some(parent_conversation_id)
);
assert_eq!(active_child_conversation.agent_name(), Some("Opened child"));
});
});
}

#[test]
fn skips_parent_task_id_as_child() {
App::test((), |mut app| async move {
Expand Down Expand Up @@ -280,6 +374,7 @@ fn skips_child_when_no_active_parent_conversation() {
// registration should be deferred to the next poll.
let model = OrchestrationViewerModel {
parent_task_id: task_id(PARENT_TASK_ID),
active_task_id: task_id(PARENT_TASK_ID),
terminal_view_id,
terminal_view: terminal_view.downgrade(),
children: HashMap::new(),
Expand Down
11 changes: 11 additions & 0 deletions app/src/terminal/shared_session/viewer/terminal_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use warpui::{

use crate::ai::active_agent_views_model::ActiveAgentViewsModel;
use crate::ai::agent::conversation::ConversationStatus;
use crate::ai::agent_conversations_model::AgentConversationsModel;
use crate::ai::blocklist::agent_view::{AgentViewController, AgentViewControllerEvent};
use crate::ai::blocklist::{
BlocklistAIContextEvent, BlocklistAIContextModel, BlocklistAIHistoryEvent,
Expand Down Expand Up @@ -801,11 +802,21 @@ impl TerminalManager {
&& FeatureFlag::OrchestrationViewerPillBar.is_enabled()
&& orchestration_viewer_model.lock().is_none()
{
let orchestration_root_task_id = AgentConversationsModel::as_ref(ctx)
.get_task_data(&task_id)
.and_then(|task| {
task.parent_run_id
.as_deref()
.filter(|id| !id.is_empty())
.and_then(|id| id.parse().ok())
})
.unwrap_or(task_id);
let weak_view_handle_for_orch = weak_view_handle.clone();
let orchestration_viewer_model_slot =
orchestration_viewer_model.clone();
let handle = ctx.add_model(|model_ctx| {
OrchestrationViewerModel::new(
orchestration_root_task_id,
task_id,
terminal_view_id,
weak_view_handle_for_orch,
Expand Down