From 9a958d239f98eb486a45e5cc6edb6f09eac07124 Mon Sep 17 00:00:00 2001 From: Coder666 Date: Wed, 20 May 2026 22:38:42 +0100 Subject: [PATCH 1/2] Support long-running local inference with configurable timeouts and busy-agent queueing Introduces configurable HTTP/tool/runtime timeouts, for inter-agent work, persistent queueing for messages sent while an agent is busy, and safer agent state cleanup so agents do not get stuck in a permanent busy state --- crates/openfang-api/src/channel_bridge.rs | 70 +++ crates/openfang-api/src/routes.rs | 6 +- .../tests/api_integration_test.rs | 2 + .../tests/daemon_lifecycle_test.rs | 2 + crates/openfang-api/tests/load_test.rs | 1 + .../tests/skill_config_api_test.rs | 1 + crates/openfang-channels/src/bridge.rs | 403 +++++++++++++++++- crates/openfang-kernel/src/config_reload.rs | 3 + crates/openfang-kernel/src/heartbeat.rs | 14 +- crates/openfang-kernel/src/kernel.rs | 90 +++- .../openfang-kernel/tests/integration_test.rs | 1 + .../openfang-kernel/tests/multi_agent_test.rs | 1 + .../tests/wasm_agent_integration_test.rs | 1 + .../tests/workflow_integration_test.rs | 1 + crates/openfang-runtime/src/agent_loop.rs | 64 +-- crates/openfang-runtime/src/context_budget.rs | 19 +- .../openfang-runtime/src/drivers/copilot.rs | 2 +- crates/openfang-runtime/src/drivers/mod.rs | 30 +- crates/openfang-runtime/src/drivers/openai.rs | 45 +- crates/openfang-runtime/src/llm_driver.rs | 3 + crates/openfang-types/src/agent.rs | 2 + crates/openfang-types/src/config.rs | 115 +++++ crates/openfang-types/src/error.rs | 8 + docs/configuration.md | 77 ++++ 24 files changed, 878 insertions(+), 83 deletions(-) diff --git a/crates/openfang-api/src/channel_bridge.rs b/crates/openfang-api/src/channel_bridge.rs index 37ad72921f..6d5e1c55b4 100644 --- a/crates/openfang-api/src/channel_bridge.rs +++ b/crates/openfang-api/src/channel_bridge.rs @@ -57,6 +57,7 @@ use openfang_channels::wecom::WeComAdapter; use openfang_kernel::OpenFangKernel; use openfang_runtime::kernel_handle::KernelHandle; use openfang_types::agent::AgentId; +use uuid::Uuid; use std::sync::Arc; use std::time::{Duration, Instant}; use tracing::{error, info, warn}; @@ -111,6 +112,75 @@ impl ChannelBridgeHandle for KernelBridgeAdapter { Ok(result.response) } + fn queue_max_retries(&self) -> usize { + std::env::var("OPENFANG_QUEUE_MAX_RETRIES") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or_else(|| { + self.kernel.config.channels.queue_max_retries.unwrap_or(300) as usize + }) + } + + fn queue_sleep_secs(&self) -> u64 { + std::env::var("OPENFANG_QUEUE_SLEEP_SECS") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or_else(|| { + self.kernel.config.channels.queue_sleep_secs.unwrap_or(2) + }) + } + + async fn is_agent_busy(&self, agent_id: AgentId) -> bool { + self.kernel + .registry + .get(agent_id) + .map(|e| e.state == openfang_types::agent::AgentState::Thinking) + .unwrap_or(false) + } + + async fn get_channel_queue(&self) -> Result { + let nil_id = openfang_types::agent::AgentId(Uuid::nil()); + let val = self + .kernel + .memory + .structured_get(nil_id, "channels_queue") + .map_err(|e| format!("{e}"))?; + match val { + Some(serde_json::Value::String(s)) => Ok(s), + _ => Ok(String::new()), + } + } + + async fn save_channel_queue(&self, queue_json: &str) -> Result<(), String> { + let nil_id = openfang_types::agent::AgentId(Uuid::nil()); + self.kernel + .memory + .structured_set( + nil_id, + "channels_queue", + serde_json::Value::String(queue_json.to_string()), + ) + .map_err(|e| format!("{e}"))?; + Ok(()) + } + + fn queue_enabled(&self) -> bool { + self.kernel.config.channels.queue_enabled.unwrap_or(true) + } + + fn queue_poll_secs(&self) -> u64 { + self.kernel + .config + .channels + .queue_poll_secs + .or_else(|| { + std::env::var("OPENFANG_QUEUE_POLL_SECS") + .ok() + .and_then(|s| s.parse().ok()) + }) + .unwrap_or(30) + } + async fn find_agent_by_name(&self, name: &str) -> Result, String> { Ok(self.kernel.registry.find_by_name(name).map(|e| e.id)) } diff --git a/crates/openfang-api/src/routes.rs b/crates/openfang-api/src/routes.rs index cebb1f599a..493949e22c 100644 --- a/crates/openfang-api/src/routes.rs +++ b/crates/openfang-api/src/routes.rs @@ -212,7 +212,7 @@ pub async fn list_agents(State(state): State>) -> impl IntoRespons }) .unwrap_or(("unknown".to_string(), "unknown".to_string())); - let ready = matches!(e.state, openfang_types::agent::AgentState::Running) + let ready = matches!(e.state, openfang_types::agent::AgentState::Running | openfang_types::agent::AgentState::Thinking) && auth_status != "missing"; // Issue #1026: surface which agents are currently calling the LLM @@ -3546,7 +3546,7 @@ pub async fn prometheus_metrics(State(state): State>) -> impl Into let agents = state.kernel.registry.list(); let active = agents .iter() - .filter(|a| matches!(a.state, openfang_types::agent::AgentState::Running)) + .filter(|a| matches!(a.state, openfang_types::agent::AgentState::Running | openfang_types::agent::AgentState::Thinking)) .count(); out.push_str("# HELP openfang_agents_active Number of active agents.\n"); out.push_str("# TYPE openfang_agents_active gauge\n"); @@ -7780,6 +7780,7 @@ pub async fn set_provider_key( api_key_env: env_var.clone(), base_url: None, subprocess_timeout_secs: None, + http_timeout_secs: None, }; let mut guard = state .kernel @@ -7948,6 +7949,7 @@ pub async fn test_provider( }, skip_permissions: true, subprocess_timeout_secs: None, + http_timeout_secs: None, }; match openfang_runtime::drivers::create_driver(&driver_config) { diff --git a/crates/openfang-api/tests/api_integration_test.rs b/crates/openfang-api/tests/api_integration_test.rs index d31a2ef651..19ab21ad8b 100644 --- a/crates/openfang-api/tests/api_integration_test.rs +++ b/crates/openfang-api/tests/api_integration_test.rs @@ -62,6 +62,7 @@ async fn start_test_server_with_provider( api_key_env: api_key_env.to_string(), base_url: None, subprocess_timeout_secs: None, + http_timeout_secs: None, }, ..KernelConfig::default() }; @@ -907,6 +908,7 @@ async fn start_test_server_with_auth(api_key: &str) -> TestServer { api_key_env: "OLLAMA_API_KEY".to_string(), base_url: None, subprocess_timeout_secs: None, + http_timeout_secs: None, }, ..KernelConfig::default() }; diff --git a/crates/openfang-api/tests/daemon_lifecycle_test.rs b/crates/openfang-api/tests/daemon_lifecycle_test.rs index c62cba9b7a..5698242277 100644 --- a/crates/openfang-api/tests/daemon_lifecycle_test.rs +++ b/crates/openfang-api/tests/daemon_lifecycle_test.rs @@ -99,6 +99,7 @@ async fn test_full_daemon_lifecycle() { api_key_env: "OLLAMA_API_KEY".to_string(), base_url: None, subprocess_timeout_secs: None, + http_timeout_secs: None, }, ..KernelConfig::default() }; @@ -227,6 +228,7 @@ async fn test_server_immediate_responsiveness() { api_key_env: "OLLAMA_API_KEY".to_string(), base_url: None, subprocess_timeout_secs: None, + http_timeout_secs: None, }, ..KernelConfig::default() }; diff --git a/crates/openfang-api/tests/load_test.rs b/crates/openfang-api/tests/load_test.rs index c0bfc9ae59..05ec27d328 100644 --- a/crates/openfang-api/tests/load_test.rs +++ b/crates/openfang-api/tests/load_test.rs @@ -43,6 +43,7 @@ async fn start_test_server() -> TestServer { api_key_env: "OLLAMA_API_KEY".to_string(), base_url: None, subprocess_timeout_secs: None, + http_timeout_secs: None, }, ..KernelConfig::default() }; diff --git a/crates/openfang-api/tests/skill_config_api_test.rs b/crates/openfang-api/tests/skill_config_api_test.rs index e8a1e60a72..89bf8a21da 100644 --- a/crates/openfang-api/tests/skill_config_api_test.rs +++ b/crates/openfang-api/tests/skill_config_api_test.rs @@ -77,6 +77,7 @@ async fn start_test_server() -> TestServer { api_key_env: "OLLAMA_API_KEY".to_string(), base_url: None, subprocess_timeout_secs: None, + http_timeout_secs: None, }, ..KernelConfig::default() }; diff --git a/crates/openfang-channels/src/bridge.rs b/crates/openfang-channels/src/bridge.rs index 2043aeaa76..f9712f7b86 100644 --- a/crates/openfang-channels/src/bridge.rs +++ b/crates/openfang-channels/src/bridge.rs @@ -19,6 +19,7 @@ use openfang_types::config::{ChannelOverrides, DmPolicy, GroupPolicy, OutputForm use openfang_types::message::ContentBlock; use std::sync::Arc; use std::time::{Duration, Instant}; +use uuid::Uuid; use tokio::sync::watch; use tracing::{debug, error, info, warn}; @@ -131,6 +132,41 @@ pub trait ChannelBridgeHandle: Send + Sync { /// Spawn an agent by manifest name, returning its ID. async fn spawn_agent_by_name(&self, manifest_name: &str) -> Result; + /// Get the maximum retry count for agent busy queuing. + fn queue_max_retries(&self) -> usize { + 300 + } + + /// Get the sleep duration in seconds between queuing retries. + fn queue_sleep_secs(&self) -> u64 { + 2 + } + + /// Check if an agent is currently busy processing a task. + async fn is_agent_busy(&self, _agent_id: AgentId) -> bool { + false + } + + /// Load the persistent channel queue serialized JSON string. + async fn get_channel_queue(&self) -> Result { + Ok(String::new()) + } + + /// Save the persistent channel queue serialized JSON string. + async fn save_channel_queue(&self, _queue_json: &str) -> Result<(), String> { + Ok(()) + } + + /// Whether persistent channel queuing is enabled. + fn queue_enabled(&self) -> bool { + true + } + + /// Get the persistent queue polling interval in seconds. + fn queue_poll_secs(&self) -> u64 { + 30 + } + /// Transcribe raw audio bytes to text. async fn transcribe_audio( &self, @@ -451,6 +487,29 @@ impl BridgeManager { // 32 is generous — most setups have 1-5 concurrent users. let semaphore = Arc::new(tokio::sync::Semaphore::new(32)); + if handle.queue_enabled() { + let handle = handle.clone(); + let router = router.clone(); + let adapter = adapter.clone(); + let mut shutdown = self.shutdown_rx.clone(); + tokio::spawn(async move { + let poll_secs = handle.queue_poll_secs(); + let mut interval = tokio::time::interval(std::time::Duration::from_secs(poll_secs)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + tokio::select! { + _ = interval.tick() => { + process_queued_messages(&handle, adapter.as_ref(), &adapter, &router).await; + } + _ = shutdown.changed() => { + break; + } + } + } + }); + } + let task = tokio::spawn(async move { let mut stream = std::pin::pin!(stream); loop { @@ -748,6 +807,184 @@ async fn try_reresolution( } } +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct QueuedMessage { + pub id: String, + pub message: ChannelMessage, + pub prefixed_text: String, + pub output_format: OutputFormat, + pub lifecycle_reactions: bool, + pub created_at: chrono::DateTime, +} + +static QUEUE_MUTEX: std::sync::OnceLock> = std::sync::OnceLock::new(); + +fn get_queue_mutex() -> &'static tokio::sync::Mutex<()> { + QUEUE_MUTEX.get_or_init(|| tokio::sync::Mutex::new(())) +} + +async fn read_queue(handle: &dyn ChannelBridgeHandle) -> Vec { + match handle.get_channel_queue().await { + Ok(s) if !s.is_empty() => serde_json::from_str(&s).unwrap_or_default(), + _ => Vec::new(), + } +} + +async fn write_queue(handle: &dyn ChannelBridgeHandle, queue: &[QueuedMessage]) { + if let Ok(json) = serde_json::to_string(queue) { + let _ = handle.save_channel_queue(&json).await; + } +} + +async fn process_queued_messages( + handle: &Arc, + adapter: &dyn ChannelAdapter, + adapter_arc: &Arc, + router: &Arc, +) { + let _guard = get_queue_mutex().lock().await; + let mut queue = read_queue(handle.as_ref()).await; + if queue.is_empty() { + return; + } + + let mut processed_any = false; + for i in 0..queue.len() { + let q_msg = &queue[i]; + if q_msg.message.channel != adapter.channel_type() { + continue; + } + + let agent_id = match q_msg.message.target_agent { + Some(id) => id, + None => { + let channel_key = q_msg.message.channel_id().unwrap_or_else(|| q_msg.message.sender.platform_id.clone()); + match router.resolve_channel_agent(&channel_key).await { + Some(id) => id, + None => { + match handle.find_agent_by_name("assistant").await.ok().flatten() { + Some(id) => id, + None => match handle.list_agents().await.ok().and_then(|a| a.first().map(|(id, _)| *id)) { + Some(id) => id, + None => continue, + } + } + } + } + } + }; + + if handle.is_agent_busy(agent_id).await { + continue; + } + + let popped = queue.remove(i); + write_queue(handle.as_ref(), &queue).await; + + drop(_guard); + + let msg = popped.message; + let prefixed_text = popped.prefixed_text; + let thread_id = msg.thread_id.as_deref(); + let output_format = popped.output_format; + let lifecycle_reactions = popped.lifecycle_reactions; + let msg_id = &msg.platform_message_id; + + info!( + channel = %adapter.name(), + agent_id = %agent_id, + "Picking up persistently queued message: {}", + openfang_types::truncate_str(&prefixed_text, 64) + ); + + let _ = adapter.send_typing(&msg.sender).await; + if lifecycle_reactions { + send_lifecycle_reaction(adapter, &msg.sender, msg_id, AgentPhase::Thinking).await; + } + + let typing_task = spawn_typing_loop(adapter_arc.clone(), msg.sender.clone()); + + let result = handle.send_message(agent_id, &prefixed_text).await; + + typing_task.abort(); + + match result { + Ok(response) => { + if lifecycle_reactions { + send_lifecycle_reaction(adapter, &msg.sender, msg_id, AgentPhase::Done).await; + } + let overrides = handle.channel_overrides(adapter.name()).await; + let text_prefixed = maybe_prefix_response(handle, overrides.as_ref(), agent_id, response).await; + send_response(adapter, &msg.sender, text_prefixed, thread_id, output_format).await; + handle.record_delivery(agent_id, &format!("{:?}", msg.channel), &msg.sender.platform_id, true, None, thread_id).await; + } + Err(e) => { + if lifecycle_reactions { + send_lifecycle_reaction(adapter, &msg.sender, msg_id, AgentPhase::Error).await; + } + let err_msg = sanitize_agent_error(&e); + if !adapter.suppress_error_responses() { + send_response(adapter, &msg.sender, err_msg.clone(), thread_id, output_format).await; + } + handle.record_delivery(agent_id, &format!("{:?}", msg.channel), &msg.sender.platform_id, false, Some(&err_msg), thread_id).await; + } + } + + processed_any = true; + break; + } +} + +async fn send_message_queued( + handle: &Arc, + agent_id: AgentId, + message: &str, +) -> Result { + let mut retries = 0; + let max_retries = handle.queue_max_retries(); + loop { + match handle.send_message(agent_id, message).await { + Err(e) if e.contains("Agent busy") || e.contains("AgentBusy") => { + retries += 1; + if retries >= max_retries { + return Err("Agent is busy and timed out waiting for the queue.".to_string()); + } + #[cfg(test)] + let sleep_dur = std::time::Duration::from_millis(5); + #[cfg(not(test))] + let sleep_dur = std::time::Duration::from_secs(handle.queue_sleep_secs()); + tokio::time::sleep(sleep_dur).await; + } + other => return other, + } + } +} + +async fn send_message_with_blocks_queued( + handle: &Arc, + agent_id: AgentId, + blocks: Vec, +) -> Result { + let mut retries = 0; + let max_retries = handle.queue_max_retries(); + loop { + match handle.send_message_with_blocks(agent_id, blocks.clone()).await { + Err(e) if e.contains("Agent busy") || e.contains("AgentBusy") => { + retries += 1; + if retries >= max_retries { + return Err("Agent is busy and timed out waiting for the queue.".to_string()); + } + #[cfg(test)] + let sleep_dur = std::time::Duration::from_millis(5); + #[cfg(not(test))] + let sleep_dur = std::time::Duration::from_secs(handle.queue_sleep_secs()); + tokio::time::sleep(sleep_dur).await; + } + other => return other, + } + } +} + /// Dispatch a single incoming message — handles bot commands or routes to an agent. /// /// Applies per-channel policies (DM/group filtering, rate limiting, formatting, threading). @@ -1330,8 +1567,48 @@ async fn dispatch_message( text.clone() }; + if handle.queue_enabled() && handle.is_agent_busy(agent_id).await { + let _guard = get_queue_mutex().lock().await; + let mut queue = read_queue(handle.as_ref()).await; + + let is_dup = queue.iter().any(|q| { + q.message.channel == message.channel + && q.message.platform_message_id == message.platform_message_id + }); + + if !is_dup { + let position = queue.iter().filter(|q| { + let q_aid = q.message.target_agent.unwrap_or(agent_id); + q_aid == agent_id + }).count() + 1; + + let queued_item = QueuedMessage { + id: Uuid::new_v4().to_string(), + message: message.clone(), + prefixed_text: prefixed_text.clone(), + output_format, + lifecycle_reactions, + created_at: chrono::Utc::now(), + }; + queue.push(queued_item); + write_queue(handle.as_ref(), &queue).await; + + let notify_msg = format!( + "⏳ Agent is currently busy. Your message has been persistently queued (Position #{position}). I will reply as soon as I'm free!" + ); + + let _ = adapter.send_typing(&message.sender).await; + if lifecycle_reactions { + send_lifecycle_reaction(adapter, &message.sender, msg_id, AgentPhase::Queued).await; + } + + send_response(adapter, &message.sender, notify_msg, thread_id, output_format).await; + } + return; + } + // Send to agent and relay response - let result = handle.send_message(agent_id, &prefixed_text).await; + let result = send_message_queued(handle, agent_id, &prefixed_text).await; // Stop the typing refresh now that we have a response typing_task.abort(); @@ -1359,7 +1636,7 @@ async fn dispatch_message( // Try re-resolution before reporting error if let Some(new_id) = try_reresolution(&e, &channel_key, handle, router).await { let typing_task2 = spawn_typing_loop(adapter_arc.clone(), message.sender.clone()); - let retry = handle.send_message(new_id, &text).await; + let retry = send_message_queued(handle, new_id, &text).await; typing_task2.abort(); match retry { Ok(response) => { @@ -1784,9 +2061,7 @@ async fn dispatch_with_blocks( // Continuous typing indicator (see spawn_typing_loop doc) let typing_task = spawn_typing_loop(adapter_arc.clone(), message.sender.clone()); - let result = handle - .send_message_with_blocks(agent_id, blocks.clone()) - .await; + let result = send_message_with_blocks_queued(handle, agent_id, blocks.clone()).await; typing_task.abort(); @@ -1823,7 +2098,7 @@ async fn dispatch_with_blocks( // Try re-resolution before reporting error if let Some(new_id) = try_reresolution(&e, &channel_key, handle, router).await { let typing_task2 = spawn_typing_loop(adapter_arc.clone(), message.sender.clone()); - let retry = handle.send_message_with_blocks(new_id, blocks).await; + let retry = send_message_with_blocks_queued(handle, new_id, blocks).await; typing_task2.abort(); match retry { Ok(response) => { @@ -2215,6 +2490,53 @@ mod tests { } } + /// Mock busy kernel handle for testing queue. + struct MockBusyHandle { + agents: Mutex>, + call_count: Mutex, + } + + #[async_trait] + impl ChannelBridgeHandle for MockBusyHandle { + async fn send_message(&self, _agent_id: AgentId, message: &str) -> Result { + let mut count = self.call_count.lock().unwrap(); + *count += 1; + if *count <= 2 { + Err("Agent busy: mock busy".to_string()) + } else { + Ok(format!("Echo: {message}")) + } + } + async fn find_agent_by_name(&self, name: &str) -> Result, String> { + let agents = self.agents.lock().unwrap(); + Ok(agents.iter().find(|(_, n)| n == name).map(|(id, _)| *id)) + } + async fn list_agents(&self) -> Result, String> { + Ok(self.agents.lock().unwrap().clone()) + } + async fn spawn_agent_by_name(&self, _manifest_name: &str) -> Result { + Err("spawn not implemented in mock".to_string()) + } + } + + #[tokio::test] + async fn test_send_message_queued_retries_and_succeeds() { + let agent_id = AgentId::new(); + let mock = Arc::new(MockBusyHandle { + agents: Mutex::new(vec![(agent_id, "busy-agent".to_string())]), + call_count: Mutex::new(0), + }); + + let handle: Arc = mock.clone(); + + // Should return successfully on the third attempt after retrying + let result = send_message_queued(&handle, agent_id, "hello").await; + assert_eq!(result, Ok("Echo: hello".to_string())); + + // The handler should have been called exactly 3 times + assert_eq!(*mock.call_count.lock().unwrap(), 3); + } + #[test] fn test_command_parsing() { // Verify slash commands are parsed correctly from text @@ -2810,4 +3132,73 @@ mod tests { "image/jpeg" ); } + + #[tokio::test] + async fn test_persistent_queuing_helpers() { + struct MockQueueHandle { + queue: Mutex, + } + + #[async_trait] + impl ChannelBridgeHandle for MockQueueHandle { + async fn send_message(&self, _agent_id: AgentId, message: &str) -> Result { + Ok(message.to_string()) + } + async fn find_agent_by_name(&self, _name: &str) -> Result, String> { + Ok(None) + } + async fn list_agents(&self) -> Result, String> { + Ok(vec![]) + } + async fn spawn_agent_by_name(&self, _manifest_name: &str) -> Result { + Err("mock".to_string()) + } + async fn get_channel_queue(&self) -> Result { + Ok(self.queue.lock().unwrap().clone()) + } + async fn save_channel_queue(&self, queue_json: &str) -> Result<(), String> { + let mut q = self.queue.lock().unwrap(); + *q = queue_json.to_string(); + Ok(()) + } + } + + let handle = MockQueueHandle { + queue: Mutex::new(String::new()), + }; + + let q = read_queue(&handle).await; + assert_eq!(q.len(), 0); + + let msg = ChannelMessage { + channel: openfang_types::config::ChannelType::Telegram, + platform_message_id: "msg_123".to_string(), + thread_id: None, + sender: openfang_types::message::ChannelUser { + platform_id: "user_456".to_string(), + display_name: "Test User".to_string(), + openfang_user: None, + }, + content: openfang_types::message::ChannelContent::Text("Hello".to_string()), + target_agent: None, + is_group: false, + metadata: std::collections::HashMap::new(), + }; + + let q_msg = QueuedMessage { + id: "uuid_1".to_string(), + message: msg.clone(), + prefixed_text: "[From: Test User] Hello".to_string(), + output_format: openfang_types::config::OutputFormat::Text, + lifecycle_reactions: false, + created_at: chrono::Utc::now(), + }; + + write_queue(&handle, &[q_msg]).await; + + let q = read_queue(&handle).await; + assert_eq!(q.len(), 1); + assert_eq!(q[0].id, "uuid_1"); + assert_eq!(q[0].prefixed_text, "[From: Test User] Hello"); + } } diff --git a/crates/openfang-kernel/src/config_reload.rs b/crates/openfang-kernel/src/config_reload.rs index 833876bd03..ba7df44523 100644 --- a/crates/openfang-kernel/src/config_reload.rs +++ b/crates/openfang-kernel/src/config_reload.rs @@ -516,6 +516,7 @@ mod tests { api_key_env: String::new(), base_url: None, subprocess_timeout_secs: Some(120), + http_timeout_secs: None, }); b.fallback_providers.push(FallbackProviderConfig { provider: "codex".to_string(), @@ -524,6 +525,7 @@ mod tests { base_url: None, // Operator raises the ceiling for slow Codex turns. subprocess_timeout_secs: Some(900), + http_timeout_secs: None, }); let plan = build_reload_plan(&a, &b); assert!( @@ -548,6 +550,7 @@ mod tests { api_key_env: String::new(), base_url: None, subprocess_timeout_secs: Some(300), + http_timeout_secs: None, }); let plan = build_reload_plan(&a, &b); assert!(!plan.restart_required); diff --git a/crates/openfang-kernel/src/heartbeat.rs b/crates/openfang-kernel/src/heartbeat.rs index 75554b1da6..c665b4e4cb 100644 --- a/crates/openfang-kernel/src/heartbeat.rs +++ b/crates/openfang-kernel/src/heartbeat.rs @@ -133,10 +133,14 @@ impl Default for RecoveryTracker { const IDLE_GRACE_SECS: i64 = 10; /// Reactive agents are healthy while idle between user messages. +/// Thinking agents are healthy while processing long LLM turns. /// /// They should only participate in heartbeat failure detection while a turn is -/// actively running. Otherwise silence is the expected steady state. -pub(crate) fn should_exempt_idle_reactive_agent(entry: &AgentEntry, is_running_task: bool) -> bool { +/// actively running OR if they are genuinely stuck in Running without activity. +pub(crate) fn should_exempt_agent(entry: &AgentEntry, is_running_task: bool) -> bool { + if entry.state == AgentState::Thinking { + return true; + } matches!(entry.manifest.schedule, ScheduleMode::Reactive) && !is_running_task } @@ -396,8 +400,8 @@ mod tests { ); agent.manifest.schedule = ScheduleMode::Reactive; - assert!(should_exempt_idle_reactive_agent(&agent, false)); - assert!(!should_exempt_idle_reactive_agent(&agent, true)); + assert!(should_exempt_agent(&agent, false)); + assert!(!should_exempt_agent(&agent, true)); } #[test] @@ -412,7 +416,7 @@ mod tests { cron: "0 * * * *".to_string(), }; - assert!(!should_exempt_idle_reactive_agent(&agent, false)); + assert!(!should_exempt_agent(&agent, false)); } #[test] diff --git a/crates/openfang-kernel/src/kernel.rs b/crates/openfang-kernel/src/kernel.rs index 8f59414c97..b6f4d44be0 100644 --- a/crates/openfang-kernel/src/kernel.rs +++ b/crates/openfang-kernel/src/kernel.rs @@ -696,6 +696,7 @@ impl OpenFangKernel { }), skip_permissions: true, subprocess_timeout_secs: config.default_model.subprocess_timeout_secs, + http_timeout_secs: config.default_model.http_timeout_secs, }; // Primary driver failure is non-fatal: the dashboard should remain accessible // even if the LLM provider is misconfigured. Users can fix config via dashboard. @@ -722,6 +723,7 @@ impl OpenFangKernel { // Inherit operator's default-model timeout intent: auto-detect // is replacing the *provider*, not the timeout policy. subprocess_timeout_secs: config.default_model.subprocess_timeout_secs, + http_timeout_secs: config.default_model.http_timeout_secs, }; match drivers::create_driver(&auto_config) { Ok(d) => { @@ -771,6 +773,7 @@ impl OpenFangKernel { .or_else(|| config.provider_urls.get(&fb.provider).cloned()), skip_permissions: true, subprocess_timeout_secs: fb.subprocess_timeout_secs, + http_timeout_secs: fb.http_timeout_secs, }; match drivers::create_driver(&fb_config) { Ok(d) => { @@ -1904,13 +1907,29 @@ impl OpenFangKernel { .entry(agent_id) .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(()))) .clone(); - let _guard = lock.lock().await; + let mut _guard = lock.lock().await; + + // Check if the agent is already processing a message (decouple hangs) + let entry = self.registry.get(agent_id).ok_or_else(|| { + KernelError::OpenFang(OpenFangError::AgentNotFound(agent_id.to_string())) + })?; + + if entry.state == AgentState::Thinking { + return Err(KernelError::OpenFang(OpenFangError::AgentBusy(agent_id.to_string()))); + } // Enforce quota before running the agent loop self.scheduler .check_quota(agent_id) .map_err(KernelError::OpenFang)?; + // Set state to Thinking and release lock to allow other messages (e.g. status/cancel) + // to reach the kernel without hanging the transport layer. + self.registry.set_state(agent_id, AgentState::Thinking) + .map_err(|e| KernelError::OpenFang(OpenFangError::Registry(e.to_string())))?; + + drop(_guard); + let entry = self.registry.get(agent_id).ok_or_else(|| { KernelError::OpenFang(OpenFangError::AgentNotFound(agent_id.to_string())) })?; @@ -1931,10 +1950,17 @@ impl OpenFangKernel { content_blocks, sender_id, sender_name, - ) - .await + Some(&self.config.runtime), + ).await }; + // Re-acquire lock to commit state and results + let _guard = lock.lock().await; + + // Reset state back to Running (or Crashed if it failed) + let final_state = if result.is_ok() { AgentState::Running } else { AgentState::Crashed }; + let _ = self.registry.set_state(agent_id, final_state); + match result { Ok(result) => { // Record token usage for quota tracking @@ -2117,6 +2143,7 @@ impl OpenFangKernel { cat.find_model(&entry.manifest.model.model) .map(|m| m.context_window as usize) }); + let ctx_window = ctx_window.or(Some(self.config.compaction.context_window_tokens)); let (tx, rx) = tokio::sync::mpsc::channel::(64); let mut manifest = entry.manifest.clone(); @@ -2378,8 +2405,8 @@ impl OpenFangKernel { ctx_window, Some(&kernel_clone.process_manager), content_blocks, - ) - .await; + Some(&kernel_clone.config.runtime), + ).await; // Drop the phase callback immediately after the streaming loop // completes. It holds a clone of the stream sender (`tx`), which @@ -2637,6 +2664,7 @@ impl OpenFangKernel { content_blocks: Option>, sender_id: Option, sender_name: Option, + _runtime_config: Option<&openfang_types::config::RuntimeConfig>, ) -> KernelResult { // Check metering quota before starting self.metering @@ -2926,6 +2954,7 @@ impl OpenFangKernel { cat.find_model(&manifest.model.model) .map(|m| m.context_window as usize) }); + let ctx_window = ctx_window.or(Some(self.config.compaction.context_window_tokens)); // skill_snapshot was already built above (before tool list and prompt) // with bundled + global + workspace skills. Reuse it for the agent loop. @@ -2969,9 +2998,9 @@ impl OpenFangKernel { ctx_window, Some(&self.process_manager), content_blocks, + Some(&self.config.runtime), ) - .await - .map_err(KernelError::OpenFang)?; + .await.map_err(KernelError::OpenFang)?; // Append new messages to canonical session for cross-channel memory if session.messages.len() > messages_before { @@ -3576,12 +3605,21 @@ impl OpenFangKernel { }); let config = CompactionConfig::default(); - - if !needs_compaction(&session, &config) { + let messages_needed = needs_compaction(&session, &config); + + // Token-based check for high-context sessions + use openfang_runtime::compactor::{estimate_token_count, needs_compaction_by_tokens}; + let estimated_tokens = estimate_token_count(&session.messages, None, None); + let tokens_needed = needs_compaction_by_tokens(estimated_tokens, &config); + + if !messages_needed && !tokens_needed { + let token_threshold = (config.context_window_tokens as f64 * config.token_threshold_ratio) as usize; return Ok(format!( - "No compaction needed ({} messages, threshold {})", + "No compaction needed ({} messages, threshold {}; estimated {} tokens, threshold {})", session.messages.len(), - config.threshold + config.threshold, + estimated_tokens, + token_threshold )); } @@ -4831,7 +4869,7 @@ impl OpenFangKernel { /// publishes `HealthCheckFailed` events for unresponsive agents. fn start_heartbeat_monitor(self: &Arc) { use crate::heartbeat::{ - check_agents, is_quiet_hours, should_exempt_idle_reactive_agent, HeartbeatConfig, + check_agents, is_quiet_hours, should_exempt_agent, HeartbeatConfig, RecoveryTracker, }; @@ -4864,7 +4902,7 @@ impl OpenFangKernel { // Reactive agents are expected to be silent while idle. // Keep them in Running instead of treating normal quiet time // as a crash unless a turn is actively executing. - if should_exempt_idle_reactive_agent( + if should_exempt_agent( &entry, kernel.running_tasks.contains_key(&status.agent_id), ) { @@ -5646,13 +5684,23 @@ impl OpenFangKernel { .find(|fb| &fb.provider == agent_provider) .and_then(|fb| fb.subprocess_timeout_secs) }; - + + let http_timeout = if agent_provider == default_provider { + effective_default.http_timeout_secs + } else { + effective_fallbacks + .iter() + .find(|fb| &fb.provider == agent_provider) + .and_then(|fb| fb.http_timeout_secs) + }; + let driver_config = DriverConfig { provider: agent_provider.clone(), api_key, base_url, skip_permissions: true, subprocess_timeout_secs: primary_timeout, + http_timeout_secs: http_timeout, }; match drivers::create_driver(&driver_config) { @@ -5738,6 +5786,11 @@ impl OpenFangKernel { } else { None }, + http_timeout_secs: if resolved_to_default { + dm.http_timeout_secs + } else { + None + }, }; match drivers::create_driver(&config) { Ok(d) => chain.push((d, strip_provider_prefix(&fb_model_name, &fb_provider))), @@ -5773,6 +5826,7 @@ impl OpenFangKernel { .or_else(|| self.lookup_provider_url(&fb.provider)), skip_permissions: true, subprocess_timeout_secs: fb.subprocess_timeout_secs, + http_timeout_secs: fb.http_timeout_secs, }; match drivers::create_driver(&fb_config) { Ok(d) => { @@ -8854,6 +8908,7 @@ mod tests { api_key_env: String::new(), base_url: None, subprocess_timeout_secs: Some(120), + http_timeout_secs: None, }); let kernel = OpenFangKernel::boot_with_config(config.clone()).expect("kernel boots"); @@ -8980,6 +9035,7 @@ mod tests { api_key_env: String::new(), base_url: None, subprocess_timeout_secs: Some(600), + http_timeout_secs: None, }); let plan = build_reload_plan(&kernel.config, &new_config); @@ -9035,6 +9091,7 @@ mod tests { api_key_env: "GROQ_API_KEY".to_string(), base_url: None, subprocess_timeout_secs: None, + http_timeout_secs: None, }, fallback_providers: vec![FallbackProviderConfig { provider: "ollama".to_string(), @@ -9042,6 +9099,7 @@ mod tests { api_key_env: String::new(), base_url: None, subprocess_timeout_secs: None, + http_timeout_secs: None, }], provider_urls, ..KernelConfig::default() @@ -9101,6 +9159,7 @@ mod tests { api_key_env: "ANTHROPIC_API_KEY".to_string(), base_url: None, subprocess_timeout_secs: None, + http_timeout_secs: None, }, ..KernelConfig::default() }; @@ -9138,6 +9197,7 @@ mod tests { api_key_env: "GROQ_API_KEY".to_string(), base_url: None, subprocess_timeout_secs: None, + http_timeout_secs: None, }, channels: ChannelsConfig { telegram: Some(TelegramConfig { @@ -9177,6 +9237,7 @@ mod tests { api_key_env: "GROQ_API_KEY".to_string(), base_url: None, subprocess_timeout_secs: None, + http_timeout_secs: None, }, mcp_servers: vec![McpServerConfigEntry { name: "openai-proxy".to_string(), @@ -9217,6 +9278,7 @@ mod tests { api_key_env: "GROQ_API_KEY".to_string(), base_url: None, subprocess_timeout_secs: None, + http_timeout_secs: None, }, ..KernelConfig::default() }; diff --git a/crates/openfang-kernel/tests/integration_test.rs b/crates/openfang-kernel/tests/integration_test.rs index b8d1f76ee0..3a8471c0ce 100644 --- a/crates/openfang-kernel/tests/integration_test.rs +++ b/crates/openfang-kernel/tests/integration_test.rs @@ -20,6 +20,7 @@ fn test_config() -> KernelConfig { api_key_env: "GROQ_API_KEY".to_string(), base_url: None, subprocess_timeout_secs: None, + http_timeout_secs: None, }, ..KernelConfig::default() } diff --git a/crates/openfang-kernel/tests/multi_agent_test.rs b/crates/openfang-kernel/tests/multi_agent_test.rs index 58a3647f20..61aa90724a 100644 --- a/crates/openfang-kernel/tests/multi_agent_test.rs +++ b/crates/openfang-kernel/tests/multi_agent_test.rs @@ -20,6 +20,7 @@ fn test_config() -> KernelConfig { api_key_env: "GROQ_API_KEY".to_string(), base_url: None, subprocess_timeout_secs: None, + http_timeout_secs: None, }, ..KernelConfig::default() } diff --git a/crates/openfang-kernel/tests/wasm_agent_integration_test.rs b/crates/openfang-kernel/tests/wasm_agent_integration_test.rs index d2f465ebf1..845261ecf1 100644 --- a/crates/openfang-kernel/tests/wasm_agent_integration_test.rs +++ b/crates/openfang-kernel/tests/wasm_agent_integration_test.rs @@ -116,6 +116,7 @@ fn test_config(tmp: &tempfile::TempDir) -> KernelConfig { api_key_env: "OLLAMA_API_KEY".to_string(), base_url: None, subprocess_timeout_secs: None, + http_timeout_secs: None, }, ..KernelConfig::default() } diff --git a/crates/openfang-kernel/tests/workflow_integration_test.rs b/crates/openfang-kernel/tests/workflow_integration_test.rs index 6c9d7b03fc..417c9979e4 100644 --- a/crates/openfang-kernel/tests/workflow_integration_test.rs +++ b/crates/openfang-kernel/tests/workflow_integration_test.rs @@ -25,6 +25,7 @@ fn test_config(provider: &str, model: &str, api_key_env: &str) -> KernelConfig { api_key_env: api_key_env.to_string(), base_url: None, subprocess_timeout_secs: None, + http_timeout_secs: None, }, ..KernelConfig::default() } diff --git a/crates/openfang-runtime/src/agent_loop.rs b/crates/openfang-runtime/src/agent_loop.rs index 615fbfdb73..3a6752ad1e 100644 --- a/crates/openfang-runtime/src/agent_loop.rs +++ b/crates/openfang-runtime/src/agent_loop.rs @@ -66,12 +66,12 @@ fn env_timeout_secs(var: &str) -> Option { /// to `0`. In that case the tool runs with no upper bound, which is what users /// on slow local inference (vLLM on old GPUs) want for Hands and inter-agent /// delegation (issue #1125). -fn tool_timeout_for(tool_name: &str) -> Option { +fn tool_timeout_for(tool_name: &str, runtime_config: Option<&openfang_types::config::RuntimeConfig>) -> Option { let secs = match tool_name { "agent_send" | "agent_spawn" => { - env_timeout_secs("OPENFANG_AGENT_TOOL_TIMEOUT_SECS").unwrap_or(AGENT_TOOL_TIMEOUT_SECS) + runtime_config.map(|c| c.agent_tool_timeout_secs).unwrap_or(env_timeout_secs("OPENFANG_AGENT_TOOL_TIMEOUT_SECS").unwrap_or(AGENT_TOOL_TIMEOUT_SECS)) } - _ => env_timeout_secs("OPENFANG_TOOL_TIMEOUT_SECS").unwrap_or(TOOL_TIMEOUT_SECS), + _ => env_timeout_secs("OPENFANG_TOOL_TIMEOUT_SECS").unwrap_or(runtime_config.map(|c| c.tool_timeout_secs).unwrap_or(TOOL_TIMEOUT_SECS)), }; if secs == 0 { None @@ -312,6 +312,7 @@ pub async fn run_agent_loop( context_window_tokens: Option, process_manager: Option<&crate::process_manager::ProcessManager>, user_content_blocks: Option>, + runtime_config: Option<&openfang_types::config::RuntimeConfig>, ) -> OpenFangResult { info!(agent = %manifest.name, "Starting agent loop"); @@ -481,11 +482,7 @@ pub async fn run_agent_loop( } // Use autonomous config max_iterations if set, else default - let max_iterations = manifest - .autonomous - .as_ref() - .map(|a| a.max_iterations) - .unwrap_or(MAX_ITERATIONS); + let max_iterations = runtime_config.map(|c| c.max_iterations).or(manifest.autonomous.as_ref().map(|a| a.max_iterations)).unwrap_or(MAX_ITERATIONS); // Initialize loop guard — scale circuit breaker for autonomous agents let loop_guard_config = { @@ -500,7 +497,7 @@ pub async fn run_agent_loop( // Build context budget from model's actual context window (or fallback to default) let ctx_window = context_window_tokens.unwrap_or(DEFAULT_CONTEXT_WINDOW); - let context_budget = ContextBudget::new(ctx_window); + let context_budget = ContextBudget::new(ctx_window, runtime_config.map(|c| c.tool_result_budget_ratio).unwrap_or(0.3)); let mut any_tools_executed = false; for iteration in 0..max_iterations { @@ -925,7 +922,7 @@ pub async fn run_agent_loop( // Timeout-wrapped execution. `tool_timeout_for` returns None // when the operator disabled the timeout (issue #1125). - let timeout_opt = tool_timeout_for(&tool_call.name); + let timeout_opt = tool_timeout_for(&tool_call.name, runtime_config); let exec_fut = tool_runner::execute_tool( &tool_call.id, &tool_call.name, @@ -1267,6 +1264,7 @@ async fn call_with_retry( base_url: fb.base_url.clone(), skip_permissions: true, subprocess_timeout_secs: None, + http_timeout_secs: None, }; let fb_driver = match crate::drivers::create_driver(&fb_config) { Ok(d) => d, @@ -1451,6 +1449,7 @@ async fn stream_with_retry( base_url: fb.base_url.clone(), skip_permissions: true, subprocess_timeout_secs: None, + http_timeout_secs: None, }; let fb_driver = match crate::drivers::create_driver(&fb_config) { Ok(d) => d, @@ -1540,6 +1539,7 @@ pub async fn run_agent_loop_streaming( context_window_tokens: Option, process_manager: Option<&crate::process_manager::ProcessManager>, user_content_blocks: Option>, + runtime_config: Option<&openfang_types::config::RuntimeConfig>, ) -> OpenFangResult { info!(agent = %manifest.name, "Starting streaming agent loop"); @@ -1700,11 +1700,7 @@ pub async fn run_agent_loop_streaming( } // Use autonomous config max_iterations if set, else default - let max_iterations = manifest - .autonomous - .as_ref() - .map(|a| a.max_iterations) - .unwrap_or(MAX_ITERATIONS); + let max_iterations = runtime_config.map(|c| c.max_iterations).or(manifest.autonomous.as_ref().map(|a| a.max_iterations)).unwrap_or(MAX_ITERATIONS); // Initialize loop guard — scale circuit breaker for autonomous agents let loop_guard_config = { @@ -1719,7 +1715,7 @@ pub async fn run_agent_loop_streaming( // Build context budget from model's actual context window (or fallback to default) let ctx_window = context_window_tokens.unwrap_or(DEFAULT_CONTEXT_WINDOW); - let context_budget = ContextBudget::new(ctx_window); + let context_budget = ContextBudget::new(ctx_window, runtime_config.map(|c| c.tool_result_budget_ratio).unwrap_or(0.3)); let mut any_tools_executed = false; for iteration in 0..max_iterations { @@ -2134,7 +2130,7 @@ pub async fn run_agent_loop_streaming( // Timeout-wrapped execution. `tool_timeout_for` returns None // when the operator disabled the timeout (issue #1125). - let timeout_opt = tool_timeout_for(&tool_call.name); + let timeout_opt = tool_timeout_for(&tool_call.name, runtime_config); let exec_fut = tool_runner::execute_tool( &tool_call.id, &tool_call.name, @@ -3431,7 +3427,7 @@ mod tests { #[test] fn test_dynamic_truncate_short_unchanged() { use crate::context_budget::{truncate_tool_result_dynamic, ContextBudget}; - let budget = ContextBudget::new(200_000); + let budget = ContextBudget::new(200_000, 0.3); let short = "Hello, world!"; assert_eq!(truncate_tool_result_dynamic(short, &budget), short); } @@ -3439,7 +3435,7 @@ mod tests { #[test] fn test_dynamic_truncate_over_limit() { use crate::context_budget::{truncate_tool_result_dynamic, ContextBudget}; - let budget = ContextBudget::new(200_000); + let budget = ContextBudget::new(200_000, 0.3); let long = "x".repeat(budget.per_result_cap() + 10_000); let result = truncate_tool_result_dynamic(&long, &budget); assert!(result.len() <= budget.per_result_cap() + 200); @@ -3450,7 +3446,7 @@ mod tests { fn test_dynamic_truncate_newline_boundary() { use crate::context_budget::{truncate_tool_result_dynamic, ContextBudget}; // Small budget to force truncation - let budget = ContextBudget::new(1_000); + let budget = ContextBudget::new(1_000, 0.3); let content = (0..200) .map(|i| format!("line {i}")) .collect::>() @@ -3491,30 +3487,30 @@ mod tests { Some(Duration::from_secs(600)) ); assert_eq!( - tool_timeout_for("file_read"), + tool_timeout_for("file_read", None), Some(Duration::from_secs(120)) ); assert_eq!( - tool_timeout_for("shell_exec"), + tool_timeout_for("shell_exec", None), Some(Duration::from_secs(120)) ); // Override: set to 0 → timeout disabled. std::env::set_var("OPENFANG_AGENT_TOOL_TIMEOUT_SECS", "0"); std::env::set_var("OPENFANG_TOOL_TIMEOUT_SECS", "0"); - assert_eq!(tool_timeout_for("agent_send"), None); - assert_eq!(tool_timeout_for("agent_spawn"), None); - assert_eq!(tool_timeout_for("file_read"), None); + assert_eq!(tool_timeout_for("agent_send", None), None); + assert_eq!(tool_timeout_for("agent_spawn", None), None); + assert_eq!(tool_timeout_for("file_read", None), None); // Override: custom positive values are honored verbatim. std::env::set_var("OPENFANG_AGENT_TOOL_TIMEOUT_SECS", "1800"); std::env::set_var("OPENFANG_TOOL_TIMEOUT_SECS", "300"); assert_eq!( - tool_timeout_for("agent_send"), + tool_timeout_for("agent_send", None), Some(Duration::from_secs(1800)) ); assert_eq!( - tool_timeout_for("file_read"), + tool_timeout_for("file_read", None), Some(Duration::from_secs(300)) ); @@ -3526,7 +3522,7 @@ mod tests { Some(Duration::from_secs(600)) ); assert_eq!( - tool_timeout_for("file_read"), + tool_timeout_for("file_read", None), Some(Duration::from_secs(120)) ); @@ -3821,6 +3817,7 @@ mod tests { None, // context_window_tokens None, // process_manager None, // user_content_blocks + None, // runtime_config ) .await .expect("Loop should complete without error"); @@ -3874,6 +3871,7 @@ mod tests { None, // context_window_tokens None, // process_manager None, // user_content_blocks + None, // runtime_config ) .await .expect("Loop should complete without error"); @@ -3929,6 +3927,7 @@ mod tests { None, // context_window_tokens None, // process_manager None, // user_content_blocks + None, // runtime_config ) .await .expect("Loop should complete without error"); @@ -3982,6 +3981,7 @@ mod tests { None, // context_window_tokens None, // process_manager None, // user_content_blocks + None, // runtime_config ) .await .expect("Loop should complete without error"); @@ -4028,6 +4028,7 @@ mod tests { None, // context_window_tokens None, // process_manager None, // user_content_blocks + None, // runtime_config ) .await .expect("Streaming loop should complete without error"); @@ -4152,6 +4153,7 @@ mod tests { None, // context_window_tokens None, // process_manager None, // user_content_blocks + None, // runtime_config ) .await .expect("Loop should recover via retry"); @@ -4199,6 +4201,7 @@ mod tests { None, // context_window_tokens None, // process_manager None, // user_content_blocks + None, // runtime_config ) .await .expect("Loop should complete with fallback"); @@ -4254,6 +4257,7 @@ mod tests { None, // context_window_tokens None, // process_manager None, // user_content_blocks + None, // runtime_config ) .await .expect("Streaming loop should complete without error"); @@ -5230,6 +5234,7 @@ mod tests { None, // context_window_tokens None, // process_manager None, // user_content_blocks + None, // runtime_config ) .await .expect("Agent loop should complete"); @@ -5300,6 +5305,7 @@ mod tests { None, None, None, + None, // runtime_config ) .await .expect("Agent loop should recover nested XML tool calls"); @@ -5372,6 +5378,7 @@ mod tests { None, None, None, // user_content_blocks + None, // runtime_config ) .await .expect("Normal loop should complete"); @@ -5435,6 +5442,7 @@ mod tests { None, // context_window_tokens None, // process_manager None, // user_content_blocks + None, // runtime_config ) .await .expect("Streaming loop should complete"); diff --git a/crates/openfang-runtime/src/context_budget.rs b/crates/openfang-runtime/src/context_budget.rs index 14cd738312..316c7c8c76 100644 --- a/crates/openfang-runtime/src/context_budget.rs +++ b/crates/openfang-runtime/src/context_budget.rs @@ -16,6 +16,8 @@ pub struct ContextBudget { /// Total context window size in tokens. pub context_window_tokens: usize, /// Estimated characters per token for tool results (denser content). + /// Maximum character budget for a single tool result (as fraction of context window). + pub tool_result_budget_ratio: f64, pub tool_chars_per_token: f64, /// Estimated characters per token for general content. pub general_chars_per_token: f64, @@ -23,8 +25,9 @@ pub struct ContextBudget { impl ContextBudget { /// Create a new budget from a context window size. - pub fn new(context_window_tokens: usize) -> Self { + pub fn new(context_window_tokens: usize, tool_result_budget_ratio: f64) -> Self { Self { + tool_result_budget_ratio, context_window_tokens, tool_chars_per_token: 2.0, general_chars_per_token: 4.0, @@ -33,7 +36,7 @@ impl ContextBudget { /// Per-result character cap: 30% of context window converted to chars. pub fn per_result_cap(&self) -> usize { - let tokens_for_tool = (self.context_window_tokens as f64 * 0.30) as usize; + let tokens_for_tool = (self.context_window_tokens as f64 * self.tool_result_budget_ratio) as usize; (tokens_for_tool as f64 * self.tool_chars_per_token) as usize } @@ -52,7 +55,7 @@ impl ContextBudget { impl Default for ContextBudget { fn default() -> Self { - Self::new(200_000) + Self::new(200_000, 0.3) } } @@ -243,7 +246,7 @@ mod tests { #[test] fn test_small_model_budget() { - let budget = ContextBudget::new(8_000); + let budget = ContextBudget::new(8_000, 0.3); // 30% of 8K * 2.0 = 4800 chars assert_eq!(budget.per_result_cap(), 4_800); } @@ -257,7 +260,7 @@ mod tests { #[test] fn test_truncate_breaks_at_newline() { - let budget = ContextBudget::new(100); // very small: cap = 60 chars + let budget = ContextBudget::new(100, 0.3); // very small: cap = 60 chars let content = "line1\nline2\nline3\nline4\nline5\nline6\nline7\nline8\nline9\nline10\nline11\nline12"; let result = truncate_tool_result_dynamic(content, &budget); @@ -279,7 +282,7 @@ mod tests { #[test] fn test_context_guard_compacts_oldest() { // Use tiny budget to trigger compaction - let budget = ContextBudget::new(100); // headroom = 75% of 100 * 2.0 = 150 chars + let budget = ContextBudget::new(100, 0.3); // headroom = 75% of 100 * 2.0 = 150 chars let big_result = "x".repeat(500); let mut messages = vec![ Message { @@ -318,7 +321,7 @@ mod tests { #[test] fn test_truncate_tool_result_multibyte_chinese() { // Tiny budget: cap = 30% of 100 * 2.0 = 60 bytes - let budget = ContextBudget::new(100); + let budget = ContextBudget::new(100, 0.3); // Each Chinese char is 3 bytes in UTF-8; 100 chars = 300 bytes let content: String = "\u{4f60}\u{597d}\u{4e16}\u{754c}".repeat(25); assert_eq!(content.len(), 300); @@ -341,7 +344,7 @@ mod tests { #[test] fn test_context_guard_multibyte_tool_results() { - let budget = ContextBudget::new(100); + let budget = ContextBudget::new(100, 0.3); // Chinese text: 500 chars * 3 bytes = 1500 bytes let big_chinese: String = "\u{4e2d}\u{6587}\u{6d4b}\u{8bd5}\u{6570}\u{636e}".repeat(83); let mut messages = vec![Message { diff --git a/crates/openfang-runtime/src/drivers/copilot.rs b/crates/openfang-runtime/src/drivers/copilot.rs index 9c1f15f341..35d3e2148c 100644 --- a/crates/openfang-runtime/src/drivers/copilot.rs +++ b/crates/openfang-runtime/src/drivers/copilot.rs @@ -620,7 +620,7 @@ impl CopilotDriver { } else { ct.base_url.clone() }; - super::openai::OpenAIDriver::new(ct.token.to_string(), base_url).with_extra_headers(vec![ + super::openai::OpenAIDriver::new(ct.token.to_string(), base_url, None).with_extra_headers(vec![ ("Editor-Version".to_string(), "vscode/1.96.0".to_string()), ( "Editor-Plugin-Version".to_string(), diff --git a/crates/openfang-runtime/src/drivers/mod.rs b/crates/openfang-runtime/src/drivers/mod.rs index ca1e0701c7..e160682c8a 100644 --- a/crates/openfang-runtime/src/drivers/mod.rs +++ b/crates/openfang-runtime/src/drivers/mod.rs @@ -382,7 +382,8 @@ pub fn create_driver(config: &DriverConfig) -> Result, LlmErr .base_url .clone() .unwrap_or_else(|| OPENAI_BASE_URL.to_string()); - return Ok(Arc::new(openai::OpenAIDriver::new(api_key, base_url))); + let timeout = config.http_timeout_secs.map(std::time::Duration::from_secs); + return Ok(Arc::new(openai::OpenAIDriver::new(api_key, base_url, timeout))); } // Claude Code CLI — subprocess-based, no API key needed @@ -457,7 +458,8 @@ pub fn create_driver(config: &DriverConfig) -> Result, LlmErr https://{resource}.openai.azure.com/openai/deployments" .to_string(), })?; - return Ok(Arc::new(openai::OpenAIDriver::new_azure(api_key, base_url))); + let timeout = config.http_timeout_secs.map(std::time::Duration::from_secs); + return Ok(Arc::new(openai::OpenAIDriver::new_azure(api_key, base_url, timeout))); } // Vertex AI — uses Google Cloud OAuth with service account credentials. @@ -546,7 +548,8 @@ pub fn create_driver(config: &DriverConfig) -> Result, LlmErr .or_else(|| local_provider_url_from_env(provider)) .unwrap_or_else(|| defaults.base_url.to_string()); - return Ok(Arc::new(openai::OpenAIDriver::new(api_key, base_url))); + let timeout = config.http_timeout_secs.map(std::time::Duration::from_secs); + return Ok(Arc::new(openai::OpenAIDriver::new(api_key, base_url, timeout))); } // Unknown provider — if base_url is set, treat as custom OpenAI-compatible. @@ -558,9 +561,11 @@ pub fn create_driver(config: &DriverConfig) -> Result, LlmErr let env_var = format!("{}_API_KEY", provider.to_uppercase().replace('-', "_")); std::env::var(&env_var).unwrap_or_default() }); + let timeout = config.http_timeout_secs.map(std::time::Duration::from_secs); return Ok(Arc::new(openai::OpenAIDriver::new( api_key, base_url.clone(), + timeout, ))); } @@ -779,6 +784,7 @@ mod tests { base_url: Some("http://localhost:9999/v1".to_string()), skip_permissions: true, subprocess_timeout_secs: None, + http_timeout_secs: None, }; let driver = create_driver(&config); assert!(driver.is_ok()); @@ -792,6 +798,7 @@ mod tests { base_url: None, skip_permissions: true, subprocess_timeout_secs: None, + http_timeout_secs: None, }; let driver = create_driver(&config); assert!(driver.is_err()); @@ -913,6 +920,7 @@ mod tests { base_url: None, skip_permissions: true, subprocess_timeout_secs: None, + http_timeout_secs: None, }; let driver = create_driver(&config); assert!( @@ -931,6 +939,7 @@ mod tests { base_url: None, skip_permissions: true, subprocess_timeout_secs: None, + http_timeout_secs: None, }; let driver = create_driver(&config); assert!(driver.is_err()); @@ -948,6 +957,7 @@ mod tests { base_url: None, skip_permissions: true, subprocess_timeout_secs: None, + http_timeout_secs: None, }; let driver = create_driver(&config); assert!( @@ -967,6 +977,7 @@ mod tests { base_url: None, skip_permissions: true, subprocess_timeout_secs: None, + http_timeout_secs: None, }; let driver = create_driver(&config); assert!(driver.is_err()); @@ -984,6 +995,7 @@ mod tests { base_url: None, skip_permissions: true, subprocess_timeout_secs: None, + http_timeout_secs: None, }; let result = create_driver(&config); assert!(result.is_err()); @@ -1012,6 +1024,7 @@ mod tests { base_url: Some("https://api.example.com/v1".to_string()), skip_permissions: true, subprocess_timeout_secs: None, + http_timeout_secs: None, }; let driver = create_driver(&config); assert!(driver.is_ok()); @@ -1040,6 +1053,7 @@ mod tests { base_url: Some("https://myresource.openai.azure.com/openai/deployments".to_string()), skip_permissions: true, subprocess_timeout_secs: None, + http_timeout_secs: None, }; let driver = create_driver(&config); assert!(driver.is_ok(), "Azure driver with key + URL should succeed"); @@ -1053,6 +1067,7 @@ mod tests { base_url: Some("https://myresource.openai.azure.com/openai/deployments".to_string()), skip_permissions: true, subprocess_timeout_secs: None, + http_timeout_secs: None, }; let result = create_driver(&config); assert!(result.is_err(), "Azure driver without key should error"); @@ -1072,6 +1087,7 @@ mod tests { base_url: None, skip_permissions: true, subprocess_timeout_secs: None, + http_timeout_secs: None, }; let result = create_driver(&config); assert!(result.is_err(), "Azure driver without URL should error"); @@ -1091,6 +1107,7 @@ mod tests { base_url: Some("https://myresource.openai.azure.com/openai/deployments".to_string()), skip_permissions: true, subprocess_timeout_secs: None, + http_timeout_secs: None, }; let driver = create_driver(&config); assert!( @@ -1116,6 +1133,7 @@ mod tests { base_url: None, skip_permissions: true, subprocess_timeout_secs: None, + http_timeout_secs: None, }; // Should succeed because api_key is provided let driver = create_driver(&config); @@ -1135,6 +1153,7 @@ mod tests { base_url: None, skip_permissions: true, subprocess_timeout_secs: None, + http_timeout_secs: None, }; let driver = create_driver(&config); assert!(driver.is_ok(), "claude-code driver should construct"); @@ -1150,6 +1169,7 @@ mod tests { base_url: None, skip_permissions: true, subprocess_timeout_secs: Some(480), + http_timeout_secs: None, }; let driver = create_driver(&config); assert!( @@ -1170,6 +1190,7 @@ mod tests { base_url: None, skip_permissions: true, subprocess_timeout_secs: Some(120), + http_timeout_secs: None, }; let driver = create_driver(&config); std::env::remove_var("OPENFANG_SUBPROCESS_TIMEOUT_SECS"); @@ -1189,6 +1210,7 @@ mod tests { base_url: None, skip_permissions: true, subprocess_timeout_secs: Some(420), + http_timeout_secs: None, }; let driver = create_driver(&config); std::env::remove_var("OPENFANG_SUBPROCESS_TIMEOUT_SECS"); @@ -1279,6 +1301,7 @@ mod tests { base_url: None, skip_permissions: true, subprocess_timeout_secs: None, + http_timeout_secs: None, }; let driver = create_driver(&config); assert!( @@ -1303,6 +1326,7 @@ mod tests { base_url: None, skip_permissions: true, subprocess_timeout_secs: None, + http_timeout_secs: None, }; let driver = create_driver(&config); assert!(driver.is_ok(), "lmstudio default should construct"); diff --git a/crates/openfang-runtime/src/drivers/openai.rs b/crates/openfang-runtime/src/drivers/openai.rs index 73210f2757..3d1943a74a 100644 --- a/crates/openfang-runtime/src/drivers/openai.rs +++ b/crates/openfang-runtime/src/drivers/openai.rs @@ -27,15 +27,16 @@ pub struct OpenAIDriver { } impl OpenAIDriver { - /// Create a new OpenAI-compatible driver. - pub fn new(api_key: String, base_url: String) -> Self { + /// Create a new OpenAI-compatible driver with optional timeout. + pub fn new(api_key: String, base_url: String, timeout: Option) -> Self { + let mut builder = reqwest::Client::builder().user_agent(crate::USER_AGENT); + if let Some(t) = timeout { + builder = builder.timeout(t); + } Self { api_key: Zeroizing::new(api_key), base_url, - client: reqwest::Client::builder() - .user_agent(crate::USER_AGENT) - .build() - .unwrap_or_default(), + client: builder.build().unwrap_or_default(), extra_headers: Vec::new(), azure_mode: false, } @@ -46,14 +47,15 @@ impl OpenAIDriver { /// Azure uses a deployment-based URL scheme and `api-key` header instead of /// `Authorization: Bearer`. The `base_url` should be the deployments root, /// e.g. `https://{resource}.openai.azure.com/openai/deployments`. - pub fn new_azure(api_key: String, base_url: String) -> Self { + pub fn new_azure(api_key: String, base_url: String, timeout: Option) -> Self { + let mut builder = reqwest::Client::builder().user_agent(crate::USER_AGENT); + if let Some(t) = timeout { + builder = builder.timeout(t); + } Self { api_key: Zeroizing::new(api_key), base_url, - client: reqwest::Client::builder() - .user_agent(crate::USER_AGENT) - .build() - .unwrap_or_default(), + client: builder.build().unwrap_or_default(), extra_headers: Vec::new(), azure_mode: true, } @@ -1707,7 +1709,7 @@ mod tests { #[test] fn test_openai_driver_creation() { - let driver = OpenAIDriver::new("test-key".to_string(), "http://localhost".to_string()); + let driver = OpenAIDriver::new("test-key".to_string(), "http://localhost".to_string(), None); assert_eq!(driver.api_key.as_str(), "test-key"); } @@ -1958,7 +1960,7 @@ mod tests { /// the upstream server is pre- or post-vLLM 0.19. #[test] fn test_assemble_emits_both_reasoning_fields_for_vllm_compat() { - let driver = OpenAIDriver::new("test".to_string(), "http://localhost:8000/v1".to_string()); + let driver = OpenAIDriver::new("test".to_string(), "http://localhost:8000/v1".to_string(), None); let blocks = vec![ ContentBlock::Thinking { thinking: "MARKER-vllm-019".to_string(), @@ -1993,7 +1995,8 @@ mod tests { /// change in #1157. #[test] fn test_assemble_no_reasoning_fields_for_plain_model() { - let driver = OpenAIDriver::new("test".to_string(), "https://api.openai.com/v1".to_string()); + let driver = OpenAIDriver::new("test".to_string(), "https://api.openai.com/v1".to_string(), + None); let blocks = vec![ContentBlock::Text { text: "hi".to_string(), provider_metadata: None, @@ -2013,7 +2016,7 @@ mod tests { #[test] fn test_assemble_moonshot_keeps_legacy_field_only() { let driver = - OpenAIDriver::new("test".to_string(), "https://api.moonshot.cn/v1".to_string()); + OpenAIDriver::new("test".to_string(), "https://api.moonshot.cn/v1".to_string(), None); let blocks = vec![ContentBlock::ToolUse { id: "call_1".to_string(), name: "search".to_string(), @@ -2039,6 +2042,7 @@ mod tests { let driver = OpenAIDriver::new_azure( "test-key".to_string(), "https://myresource.openai.azure.com/openai/deployments".to_string(), + None, ); assert!(driver.azure_mode); } @@ -2048,6 +2052,7 @@ mod tests { let driver = OpenAIDriver::new( "test-key".to_string(), "https://api.openai.com/v1".to_string(), + None, ); assert!(!driver.azure_mode); } @@ -2057,6 +2062,7 @@ mod tests { let driver = OpenAIDriver::new_azure( "test-key".to_string(), "https://myresource.openai.azure.com/openai/deployments".to_string(), + None, ); let url = driver.chat_url("my-gpt4o-deployment"); assert_eq!( @@ -2070,6 +2076,7 @@ mod tests { let driver = OpenAIDriver::new_azure( "test-key".to_string(), "https://myresource.openai.azure.com/openai/deployments/".to_string(), + None, ); let url = driver.chat_url("gpt-4o"); assert_eq!( @@ -2083,6 +2090,7 @@ mod tests { let driver = OpenAIDriver::new( "test-key".to_string(), "https://api.openai.com/v1".to_string(), + None, ); let url = driver.chat_url("gpt-4o"); assert_eq!(url, "https://api.openai.com/v1/chat/completions"); @@ -2094,6 +2102,7 @@ mod tests { let driver = OpenAIDriver::new( "test-key".to_string(), "https://api.moonshot.ai/v1".to_string(), + None, ); // kimi-k2.5 must go to the .cn endpoint let url = driver.chat_url("kimi-k2.5"); @@ -2118,6 +2127,7 @@ mod tests { let driver = OpenAIDriver::new( "test".to_string(), "https://api.minimax.chat/v1".to_string(), + None, ); let blocks = vec![ ContentBlock::Thinking { @@ -2151,6 +2161,7 @@ mod tests { let driver = OpenAIDriver::new( "test".to_string(), "https://api.deepseek.com/v1".to_string(), + None, ); let blocks = vec![ ContentBlock::Thinking { @@ -2183,7 +2194,8 @@ mod tests { /// assistant message — preserve the legacy shape. #[test] fn test_assemble_assistant_no_thinking_is_plain() { - let driver = OpenAIDriver::new("test".to_string(), "https://api.openai.com/v1".to_string()); + let driver = OpenAIDriver::new("test".to_string(), "https://api.openai.com/v1".to_string(), + None); let blocks = vec![ContentBlock::Text { text: "Hi.".to_string(), provider_metadata: None, @@ -2233,6 +2245,7 @@ mod tests { let driver = OpenAIDriver::new( "test".to_string(), "https://api.deepseek.com/v1".to_string(), + None, ); let outbound = assemble_assistant_message(&content, "deepseek-reasoner", &driver); // The reasoning_content field must round-trip verbatim. diff --git a/crates/openfang-runtime/src/llm_driver.rs b/crates/openfang-runtime/src/llm_driver.rs index 500192c839..02a651cffd 100644 --- a/crates/openfang-runtime/src/llm_driver.rs +++ b/crates/openfang-runtime/src/llm_driver.rs @@ -210,6 +210,9 @@ pub struct DriverConfig { /// opt in to this field individually. #[serde(default)] pub subprocess_timeout_secs: Option, + /// Global HTTP client timeout in seconds. + #[serde(default)] + pub http_timeout_secs: Option, } fn default_skip_permissions() -> bool { diff --git a/crates/openfang-types/src/agent.rs b/crates/openfang-types/src/agent.rs index 5589b1e35e..5f6d535aff 100644 --- a/crates/openfang-types/src/agent.rs +++ b/crates/openfang-types/src/agent.rs @@ -183,6 +183,8 @@ pub enum AgentState { Terminated, /// Agent crashed and is awaiting recovery. Crashed, + /// Agent is actively processing an LLM request (long-running turn). + Thinking, } /// Permission-based operational mode for an agent. diff --git a/crates/openfang-types/src/config.rs b/crates/openfang-types/src/config.rs index 25df9b0059..823401b007 100644 --- a/crates/openfang-types/src/config.rs +++ b/crates/openfang-types/src/config.rs @@ -491,6 +491,9 @@ pub struct FallbackProviderConfig { /// over this field at driver-construction time. #[serde(default)] pub subprocess_timeout_secs: Option, + /// Global HTTP client timeout in seconds for this fallback. + #[serde(default)] + pub http_timeout_secs: Option, } /// Text-to-speech configuration. @@ -1296,7 +1299,12 @@ pub struct KernelConfig { /// github_token = "ghp_..." /// default_branch = "develop" /// ``` + /// Agent runtime configuration. + #[serde(default)] + pub runtime: RuntimeConfig, + /// Session compaction configuration. #[serde(default)] + pub compaction: CompactionConfig, pub skills: HashMap>, } @@ -1540,6 +1548,8 @@ impl Default for KernelConfig { workflows_dir: None, heartbeat: HeartbeatSettings::default(), skills: HashMap::new(), + runtime: RuntimeConfig::default(), + compaction: CompactionConfig::default(), } } } @@ -1696,6 +1706,8 @@ pub struct DefaultModelConfig { /// `OPENFANG_SUBPROCESS_TIMEOUT_SECS` env var, if set, wins over this /// field at driver-construction time. pub subprocess_timeout_secs: Option, + /// Global HTTP client timeout in seconds for the default model. + pub http_timeout_secs: Option, } impl Default for DefaultModelConfig { @@ -1706,6 +1718,7 @@ impl Default for DefaultModelConfig { api_key_env: "ANTHROPIC_API_KEY".to_string(), base_url: None, subprocess_timeout_secs: None, + http_timeout_secs: None, } } } @@ -1909,6 +1922,16 @@ pub struct ChannelsConfig { pub wecom: Option, /// MQTT pub/sub configuration (None = disabled). pub mqtt: Option, + /// Maximum retries for agent busy queuing. + pub queue_max_retries: Option, + /// Sleep duration in seconds between agent busy retries. + pub queue_sleep_secs: Option, + /// Enable persistent queueing when an agent is busy. + #[serde(default)] + pub queue_enabled: Option, + /// Configurable persistent queue polling interval in seconds. + #[serde(default)] + pub queue_poll_secs: Option, } /// Telegram channel adapter configuration. @@ -4334,6 +4357,7 @@ mod tests { api_key_env: String::new(), base_url: None, subprocess_timeout_secs: None, + http_timeout_secs: None, }; let json = serde_json::to_string(&fb).unwrap(); let back: FallbackProviderConfig = serde_json::from_str(&json).unwrap(); @@ -4699,3 +4723,94 @@ shell_env_passthrough = ["*"] assert_eq!(policy.shell_env_passthrough, vec!["*"]); } } +/// Configuration for session compaction. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct CompactionConfig { + /// Compact when session message count exceeds this. + pub threshold: usize, + /// Number of recent messages to keep verbatim (not summarized). + pub keep_recent: usize, + /// Maximum tokens for the summary generation. + pub max_summary_tokens: u32, + /// Base ratio of messages to process per chunk (0.0-1.0). + pub base_chunk_ratio: f64, + /// Minimum chunk ratio (floor for adaptive computation). + pub min_chunk_ratio: f64, + /// Safety margin multiplier for token estimation inaccuracy. + pub safety_margin: f64, + /// Overhead tokens reserved for summarization prompt itself. + pub summarization_overhead_tokens: u32, + /// Maximum input chars per summarization chunk. + pub max_chunk_chars: usize, + /// Maximum retry attempts for summarization. + pub max_retries: u32, + /// Trigger compaction when estimated tokens exceed this fraction of context_window_tokens. + pub token_threshold_ratio: f64, + /// Model context window size in tokens. + pub context_window_tokens: usize, +} + +impl Default for CompactionConfig { + fn default() -> Self { + Self { + threshold: 30, + keep_recent: 10, + max_summary_tokens: 1024, + base_chunk_ratio: 0.4, + min_chunk_ratio: 0.15, + safety_margin: 1.2, + summarization_overhead_tokens: 4096, + max_chunk_chars: 80_000, + max_retries: 3, + token_threshold_ratio: 0.7, + context_window_tokens: 200_000, + } + } +} + +/// Configuration for the agent runtime. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct RuntimeConfig { + /// Maximum iterations per agent turn. + pub max_iterations: u32, + /// Maximum retries for LLM calls. + pub max_retries: u32, + /// Base delay for retries in milliseconds. + pub base_retry_delay_ms: u64, + /// Timeout for local tool execution in seconds. + pub tool_timeout_secs: u64, + /// Timeout for inter-agent tool calls (A2A) in seconds. + pub agent_tool_timeout_secs: u64, + /// Maximum model continuations per turn. + pub max_continuations: u32, + /// Maximum recursion depth for agent-to-agent calls. + pub max_agent_call_depth: u32, + /// Browser CDP connection timeout in seconds. + pub browser_connect_timeout_secs: u64, + /// Browser CDP command timeout in seconds. + pub browser_command_timeout_secs: u64, + /// MCP request timeout in seconds. + pub mcp_timeout_secs: u64, + /// Maximum character budget for a single tool result (as fraction of context window). + pub tool_result_budget_ratio: f64, +} + +impl Default for RuntimeConfig { + fn default() -> Self { + Self { + max_iterations: 50, + max_retries: 3, + base_retry_delay_ms: 1000, + tool_timeout_secs: 120, + agent_tool_timeout_secs: 600, + max_continuations: 5, + max_agent_call_depth: 5, + browser_connect_timeout_secs: 15, + browser_command_timeout_secs: 30, + mcp_timeout_secs: 30, + tool_result_budget_ratio: 0.3, + } + } +} diff --git a/crates/openfang-types/src/error.rs b/crates/openfang-types/src/error.rs index 4f6be01422..77313da927 100644 --- a/crates/openfang-types/src/error.rs +++ b/crates/openfang-types/src/error.rs @@ -98,6 +98,14 @@ pub enum OpenFangError { /// Invalid user input. #[error("Invalid input: {0}")] InvalidInput(String), + + /// The agent is currently busy processing another message. + #[error("Agent busy: {0}")] + AgentBusy(String), + + /// An error occurred in the agent registry. + #[error("Registry error: {0}")] + Registry(String), } /// Alias for Result with OpenFangError. diff --git a/docs/configuration.md b/docs/configuration.md index a233c99d59..5cf79a045f 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -104,6 +104,7 @@ provider = "anthropic" model = "claude-sonnet-4-20250514" api_key_env = "ANTHROPIC_API_KEY" # base_url = "https://api.anthropic.com" # Optional override +# http_timeout_secs = 600 # timeout for long inference turns # --- Fallback Providers --- [[fallback_providers]] @@ -159,6 +160,13 @@ max_chars = 50000 max_response_bytes = 10485760 # 10 MB timeout_secs = 30 readability = true + +# --- Runtime Limits --- +[runtime] +max_iterations = 50 # Max steps per turn +tool_timeout_secs = 120 # Max seconds for regular tools +agent_tool_timeout_secs = 600 # Max seconds for agent_send/agent_spawn +max_continuations = 5 # Max LLM continuations per turn # --- MCP Servers --- [[mcp_servers]] @@ -333,6 +341,7 @@ api_key_env = "ANTHROPIC_API_KEY" | `model` | string | `"claude-sonnet-4-20250514"` | Model identifier. Aliases like `sonnet`, `haiku`, `gpt-4o`, `gemini-flash` are resolved by the model catalog. | | `api_key_env` | string | `"ANTHROPIC_API_KEY"` | Name of the environment variable holding the API key. The actual key is read from this env var at runtime, never stored in config. | | `base_url` | string or null | `null` | Override the API base URL. Useful for proxies or self-hosted endpoints. When `null`, the provider's default URL from the model catalog is used. | +| `http_timeout_secs` | u64 or null | `null` | HTTP client timeout in seconds for long-running inference turns. When `null`, the driver default is used (usually 300s). For long-form generation (long-running agent sessions, etc.), bump this to a larger value (example: 1800 for 30 mins), and ensure any proxy or container configurations are set with an equal or higher timeout value. | --- @@ -518,12 +527,49 @@ readability = true --- +### `[runtime]` + +Configures execution limits and timeouts for the agent loop and tool execution. + +```toml +[runtime] +max_iterations = 50 +tool_timeout_secs = 120 +agent_tool_timeout_secs = 600 +max_continuations = 5 +max_agent_call_depth = 5 +``` + +| Field | Type | Default | Description | +|-------|------|---------|-------------| +| `max_iterations` | u32 | `50` | Maximum number of tool-call iterations per agent turn before forcing a stop. | +| `tool_timeout_secs` | u64 | `120` | Maximum execution time in seconds for regular tools (filesystem, web, etc.). | +| `agent_tool_timeout_secs` | u64 | `600` | Maximum execution time in seconds for inter-agent tools (`agent_send`, `agent_spawn`). These involve full remote agent turns and often require significantly longer timeouts (3 hours by default). | +| `max_continuations` | u32 | `5` | Maximum number of LLM continuations allowed when a model hits its output token limit. | +| `max_agent_call_depth` | u32 | `5` | Maximum recursion depth for agent-to-agent calls to prevent infinite loops. | +| `mcp_timeout_secs` | u64 | `30` | Default timeout for MCP tool requests. | + +--- + ### `[channels]` All 40 channel adapters are configured under `[channels.]`. Each channel is `Option` -- omitting the section disables the adapter entirely. Including the section header (even empty) enables it with default values. Every channel config includes a `default_agent` field (optional agent name to route messages to) and an `overrides` sub-table (see [Channel Overrides](#channel-overrides)). +**Global Channel Queue Settings:** + +These settings control the asynchronous queuing mechanism that serializes concurrent incoming messages to the same agent when they are in a "Thinking" state, preventing concurrent turns from blocking or causing "Agent busy" errors. + +| Field | Type | Default | Description | +|-------|------|---------|-------------| +| `queue_max_retries` | u64 | `300` | Maximum number of retry attempts for queuing when the agent is busy/thinking before giving up and returning a timeout error. | +| `queue_sleep_secs` | u64 | `2` | Number of seconds to pause between polling retry attempts. | +| `queue_enabled` | bool | `true` | Enable SQLite-backed persistent queuing when an agent is busy/thinking. When enabled, incoming messages are saved to SQLite and processed sequentially as soon as the agent becomes free. | +| `queue_poll_secs` | u64 | `30` | Configurable polling interval in seconds for the background queue processor loop. | + +These global settings can also be overridden via the environment variables `OPENFANG_QUEUE_MAX_RETRIES`, `OPENFANG_QUEUE_SLEEP_SECS`, `OPENFANG_QUEUE_ENABLED`, and `OPENFANG_QUEUE_POLL_SECS`. + #### `[channels.telegram]` ```toml @@ -1292,6 +1338,7 @@ api_key_env = "GROQ_API_KEY" | `model` | string | `""` | Model identifier for this provider. | | `api_key_env` | string | `""` | Env var name for the API key. Empty for local providers (ollama, vllm, lmstudio). | | `base_url` | string or null | `null` | Base URL override. Uses catalog default if null. | +| `http_timeout_secs` | u64 or null | `null` | HTTP client timeout in seconds for this fallback. | --- @@ -1601,3 +1648,33 @@ Configured in agent manifests via `AutonomousConfig`: | `max_restarts` | `10` | Maximum automatic restarts before permanent stop. | | `heartbeat_interval_secs` | `30` | Seconds between heartbeat health checks. | | `heartbeat_channel` | `null` | Channel to send heartbeat status to (e.g., `"telegram"`). | + +--- + +## Local LLM Operations: Cost Estimation + +For local hardware setups (such as multi-GPU local inference servers), OpenFang supports power-aware costing models to track token expenditures alongside external cloud API provider costs. + +### Cost Estimation Config + +To reflect empirical energy usage in your local OpenFang token billing telemetry, you can configure model cost values per million tokens in `custom_models.json` (located in `~/.openfang/custom_models.json` or `/data/custom_models.json` inside Docker): + +```json +[ + { + "id": "qwen2.5-72b-instruct", + "display_name": "Local Qwen 2.5 72B (vLLM)", + "provider": "vllm", + "tier": "local", + "context_window": 32768, + "max_output_tokens": 4096, + "input_cost_per_m": 0.92, + "output_cost_per_m": 0.92, + "supports_tools": true, + "supports_vision": false, + "supports_streaming": true + } +] +``` + +* **Calculation Basis:** The local model pricing can be calculated by factoring the hosting server's average active wattage draw (e.g. 500W) against your local electricity utility tariff and average generation speed. This provides a ground-truth cost per million tokens for accurate resource accounting. From 02792f322419ffc63764b725def067e7d1f6afed Mon Sep 17 00:00:00 2001 From: Coder666 Date: Wed, 20 May 2026 11:04:50 +0100 Subject: [PATCH 2/2] fix: harden agent busy queue handling fix: align queued channel routing with bridge router fix: validate channel queue timing config --- crates/openfang-api/src/channel_bridge.rs | 29 ++- crates/openfang-channels/src/bridge.rs | 245 ++++++++++++++-------- crates/openfang-kernel/src/kernel.rs | 53 ++++- crates/openfang-runtime/src/agent_loop.rs | 33 ++- 4 files changed, 243 insertions(+), 117 deletions(-) diff --git a/crates/openfang-api/src/channel_bridge.rs b/crates/openfang-api/src/channel_bridge.rs index 6d5e1c55b4..891dd9a3fe 100644 --- a/crates/openfang-api/src/channel_bridge.rs +++ b/crates/openfang-api/src/channel_bridge.rs @@ -115,19 +115,20 @@ impl ChannelBridgeHandle for KernelBridgeAdapter { fn queue_max_retries(&self) -> usize { std::env::var("OPENFANG_QUEUE_MAX_RETRIES") .ok() - .and_then(|s| s.parse().ok()) - .unwrap_or_else(|| { - self.kernel.config.channels.queue_max_retries.unwrap_or(300) as usize - }) + .and_then(|s| s.parse::().ok()) + .or(self.kernel.config.channels.queue_max_retries) + .filter(|retries| *retries > 0) + .and_then(|retries| usize::try_from(retries).ok()) + .unwrap_or(300) } fn queue_sleep_secs(&self) -> u64 { std::env::var("OPENFANG_QUEUE_SLEEP_SECS") .ok() .and_then(|s| s.parse().ok()) - .unwrap_or_else(|| { - self.kernel.config.channels.queue_sleep_secs.unwrap_or(2) - }) + .or(self.kernel.config.channels.queue_sleep_secs) + .filter(|secs| *secs > 0) + .unwrap_or(2) } async fn is_agent_busy(&self, agent_id: AgentId) -> bool { @@ -169,15 +170,11 @@ impl ChannelBridgeHandle for KernelBridgeAdapter { } fn queue_poll_secs(&self) -> u64 { - self.kernel - .config - .channels - .queue_poll_secs - .or_else(|| { - std::env::var("OPENFANG_QUEUE_POLL_SECS") - .ok() - .and_then(|s| s.parse().ok()) - }) + std::env::var("OPENFANG_QUEUE_POLL_SECS") + .ok() + .and_then(|s| s.parse().ok()) + .or(self.kernel.config.channels.queue_poll_secs) + .filter(|secs| *secs > 0) .unwrap_or(30) } diff --git a/crates/openfang-channels/src/bridge.rs b/crates/openfang-channels/src/bridge.rs index f9712f7b86..0f2ad0d221 100644 --- a/crates/openfang-channels/src/bridge.rs +++ b/crates/openfang-channels/src/bridge.rs @@ -842,99 +842,162 @@ async fn process_queued_messages( adapter_arc: &Arc, router: &Arc, ) { - let _guard = get_queue_mutex().lock().await; - let mut queue = read_queue(handle.as_ref()).await; - if queue.is_empty() { - return; - } - - let mut processed_any = false; - for i in 0..queue.len() { - let q_msg = &queue[i]; - if q_msg.message.channel != adapter.channel_type() { - continue; + let selected = { + let _guard = get_queue_mutex().lock().await; + let queue = read_queue(handle.as_ref()).await; + if queue.is_empty() { + return; } - let agent_id = match q_msg.message.target_agent { - Some(id) => id, - None => { - let channel_key = q_msg.message.channel_id().unwrap_or_else(|| q_msg.message.sender.platform_id.clone()); - match router.resolve_channel_agent(&channel_key).await { - Some(id) => id, - None => { - match handle.find_agent_by_name("assistant").await.ok().flatten() { + let mut selected = None; + for q_msg in queue.iter() { + if q_msg.message.channel != adapter.channel_type() { + continue; + } + + let agent_id = match q_msg.message.target_agent { + Some(id) => id, + None => { + let target_agent_name = q_msg + .message + .metadata + .get("target_agent_name") + .and_then(|v| v.as_str()); + let routed_by_name = if let Some(name) = target_agent_name { + handle.find_agent_by_name(name).await.ok().flatten() + } else { + None + }; + let binding_ctx = binding_context_for(&q_msg.message); + match routed_by_name.or_else(|| { + router.resolve_with_context( + &q_msg.message.channel, + sender_user_id(&q_msg.message), + q_msg.message.sender.openfang_user.as_deref(), + &binding_ctx, + ) + }) { + Some(id) => id, + None => match handle.find_agent_by_name("assistant").await.ok().flatten() { Some(id) => id, - None => match handle.list_agents().await.ok().and_then(|a| a.first().map(|(id, _)| *id)) { + None => match handle + .list_agents() + .await + .ok() + .and_then(|a| a.first().map(|(id, _)| *id)) + { Some(id) => id, None => continue, - } - } + }, + }, } } + }; + + if handle.is_agent_busy(agent_id).await { + continue; } - }; - if handle.is_agent_busy(agent_id).await { - continue; + selected = Some((q_msg.clone(), agent_id)); + break; } - let popped = queue.remove(i); - write_queue(handle.as_ref(), &queue).await; - - drop(_guard); - - let msg = popped.message; - let prefixed_text = popped.prefixed_text; - let thread_id = msg.thread_id.as_deref(); - let output_format = popped.output_format; - let lifecycle_reactions = popped.lifecycle_reactions; - let msg_id = &msg.platform_message_id; - - info!( - channel = %adapter.name(), - agent_id = %agent_id, - "Picking up persistently queued message: {}", - openfang_types::truncate_str(&prefixed_text, 64) - ); + selected + }; - let _ = adapter.send_typing(&msg.sender).await; - if lifecycle_reactions { - send_lifecycle_reaction(adapter, &msg.sender, msg_id, AgentPhase::Thinking).await; - } + let Some((popped, agent_id)) = selected else { + return; + }; - let typing_task = spawn_typing_loop(adapter_arc.clone(), msg.sender.clone()); + let msg = popped.message; + let prefixed_text = popped.prefixed_text; + let thread_id = msg.thread_id.as_deref(); + let output_format = popped.output_format; + let lifecycle_reactions = popped.lifecycle_reactions; + let msg_id = &msg.platform_message_id; - let result = handle.send_message(agent_id, &prefixed_text).await; + info!( + channel = %adapter.name(), + agent_id = %agent_id, + "Picking up persistently queued message: {}", + openfang_types::truncate_str(&prefixed_text, 64) + ); - typing_task.abort(); + let _ = adapter.send_typing(&msg.sender).await; + if lifecycle_reactions { + send_lifecycle_reaction(adapter, &msg.sender, msg_id, AgentPhase::Thinking).await; + } - match result { - Ok(response) => { - if lifecycle_reactions { - send_lifecycle_reaction(adapter, &msg.sender, msg_id, AgentPhase::Done).await; - } - let overrides = handle.channel_overrides(adapter.name()).await; - let text_prefixed = maybe_prefix_response(handle, overrides.as_ref(), agent_id, response).await; - send_response(adapter, &msg.sender, text_prefixed, thread_id, output_format).await; - handle.record_delivery(agent_id, &format!("{:?}", msg.channel), &msg.sender.platform_id, true, None, thread_id).await; + let typing_task = spawn_typing_loop(adapter_arc.clone(), msg.sender.clone()); + let result = handle.send_message(agent_id, &prefixed_text).await; + typing_task.abort(); + + let should_remove = result.is_ok() + || result + .as_ref() + .err() + .map(|e| !is_busy_error(e)) + .unwrap_or(false); + + if should_remove { + let _guard = get_queue_mutex().lock().await; + let mut queue = read_queue(handle.as_ref()).await; + queue.retain(|q| q.id != popped.id); + write_queue(handle.as_ref(), &queue).await; + } + + match result { + Ok(response) => { + if lifecycle_reactions { + send_lifecycle_reaction(adapter, &msg.sender, msg_id, AgentPhase::Done).await; } - Err(e) => { - if lifecycle_reactions { - send_lifecycle_reaction(adapter, &msg.sender, msg_id, AgentPhase::Error).await; - } - let err_msg = sanitize_agent_error(&e); - if !adapter.suppress_error_responses() { - send_response(adapter, &msg.sender, err_msg.clone(), thread_id, output_format).await; - } - handle.record_delivery(agent_id, &format!("{:?}", msg.channel), &msg.sender.platform_id, false, Some(&err_msg), thread_id).await; + let overrides = handle.channel_overrides(adapter.name()).await; + let text_prefixed = + maybe_prefix_response(handle, overrides.as_ref(), agent_id, response).await; + send_response(adapter, &msg.sender, text_prefixed, thread_id, output_format).await; + handle + .record_delivery( + agent_id, + &format!("{:?}", msg.channel), + &msg.sender.platform_id, + true, + None, + thread_id, + ) + .await; + } + Err(e) if is_busy_error(&e) => { + if lifecycle_reactions { + send_lifecycle_reaction(adapter, &msg.sender, msg_id, AgentPhase::Queued).await; } } - - processed_any = true; - break; + Err(e) => { + if lifecycle_reactions { + send_lifecycle_reaction(adapter, &msg.sender, msg_id, AgentPhase::Error).await; + } + let err_msg = sanitize_agent_error(&e); + if !adapter.suppress_error_responses() { + send_response(adapter, &msg.sender, err_msg.clone(), thread_id, output_format) + .await; + } + handle + .record_delivery( + agent_id, + &format!("{:?}", msg.channel), + &msg.sender.platform_id, + false, + Some(&err_msg), + thread_id, + ) + .await; + } } } +fn is_busy_error(error: &str) -> bool { + error.contains("Agent busy") || error.contains("AgentBusy") +} + async fn send_message_queued( handle: &Arc, agent_id: AgentId, @@ -944,7 +1007,7 @@ async fn send_message_queued( let max_retries = handle.queue_max_retries(); loop { match handle.send_message(agent_id, message).await { - Err(e) if e.contains("Agent busy") || e.contains("AgentBusy") => { + Err(e) if is_busy_error(&e) => { retries += 1; if retries >= max_retries { return Err("Agent is busy and timed out waiting for the queue.".to_string()); @@ -969,7 +1032,7 @@ async fn send_message_with_blocks_queued( let max_retries = handle.queue_max_retries(); loop { match handle.send_message_with_blocks(agent_id, blocks.clone()).await { - Err(e) if e.contains("Agent busy") || e.contains("AgentBusy") => { + Err(e) if is_busy_error(&e) => { retries += 1; if retries >= max_retries { return Err("Agent is busy and timed out waiting for the queue.".to_string()); @@ -1570,21 +1633,27 @@ async fn dispatch_message( if handle.queue_enabled() && handle.is_agent_busy(agent_id).await { let _guard = get_queue_mutex().lock().await; let mut queue = read_queue(handle.as_ref()).await; - + let is_dup = queue.iter().any(|q| { q.message.channel == message.channel && q.message.platform_message_id == message.platform_message_id }); - - if !is_dup { - let position = queue.iter().filter(|q| { - let q_aid = q.message.target_agent.unwrap_or(agent_id); - q_aid == agent_id - }).count() + 1; + if !is_dup { + let position = queue + .iter() + .filter(|q| { + let q_aid = q.message.target_agent.unwrap_or(agent_id); + q_aid == agent_id + }) + .count() + + 1; + + let mut queued_message = message.clone(); + queued_message.target_agent = Some(agent_id); let queued_item = QueuedMessage { id: Uuid::new_v4().to_string(), - message: message.clone(), + message: queued_message, prefixed_text: prefixed_text.clone(), output_format, lifecycle_reactions, @@ -1596,14 +1665,15 @@ async fn dispatch_message( let notify_msg = format!( "⏳ Agent is currently busy. Your message has been persistently queued (Position #{position}). I will reply as soon as I'm free!" ); - + let _ = adapter.send_typing(&message.sender).await; if lifecycle_reactions { send_lifecycle_reaction(adapter, &message.sender, msg_id, AgentPhase::Queued).await; } - + send_response(adapter, &message.sender, notify_msg, thread_id, output_format).await; } + typing_task.abort(); return; } @@ -3171,16 +3241,17 @@ mod tests { assert_eq!(q.len(), 0); let msg = ChannelMessage { - channel: openfang_types::config::ChannelType::Telegram, + channel: ChannelType::Telegram, platform_message_id: "msg_123".to_string(), thread_id: None, - sender: openfang_types::message::ChannelUser { + sender: ChannelUser { platform_id: "user_456".to_string(), display_name: "Test User".to_string(), openfang_user: None, }, - content: openfang_types::message::ChannelContent::Text("Hello".to_string()), + content: ChannelContent::Text("Hello".to_string()), target_agent: None, + timestamp: chrono::Utc::now(), is_group: false, metadata: std::collections::HashMap::new(), }; @@ -3189,7 +3260,7 @@ mod tests { id: "uuid_1".to_string(), message: msg.clone(), prefixed_text: "[From: Test User] Hello".to_string(), - output_format: openfang_types::config::OutputFormat::Text, + output_format: OutputFormat::PlainText, lifecycle_reactions: false, created_at: chrono::Utc::now(), }; diff --git a/crates/openfang-kernel/src/kernel.rs b/crates/openfang-kernel/src/kernel.rs index b6f4d44be0..fba0bac9af 100644 --- a/crates/openfang-kernel/src/kernel.rs +++ b/crates/openfang-kernel/src/kernel.rs @@ -40,6 +40,38 @@ use std::path::{Path, PathBuf}; use std::sync::{Arc, OnceLock, Weak}; use tracing::{debug, info, warn}; +struct AgentThinkingGuard<'a> { + registry: &'a AgentRegistry, + agent_id: AgentId, + active: bool, +} + +impl<'a> AgentThinkingGuard<'a> { + fn new(registry: &'a AgentRegistry, agent_id: AgentId) -> KernelResult { + registry + .set_state(agent_id, AgentState::Thinking) + .map_err(|e| KernelError::OpenFang(OpenFangError::Registry(e.to_string())))?; + Ok(Self { + registry, + agent_id, + active: true, + }) + } + + fn finish(mut self, state: AgentState) { + self.active = false; + let _ = self.registry.set_state(self.agent_id, state); + } +} + +impl Drop for AgentThinkingGuard<'_> { + fn drop(&mut self) { + if self.active { + let _ = self.registry.set_state(self.agent_id, AgentState::Running); + } + } +} + /// The main OpenFang kernel — coordinates all subsystems. /// Stub LLM driver used when no providers are configured. /// Returns a helpful error so the dashboard still boots and users can configure providers. @@ -1915,7 +1947,9 @@ impl OpenFangKernel { })?; if entry.state == AgentState::Thinking { - return Err(KernelError::OpenFang(OpenFangError::AgentBusy(agent_id.to_string()))); + return Err(KernelError::OpenFang(OpenFangError::AgentBusy( + agent_id.to_string(), + ))); } // Enforce quota before running the agent loop @@ -1924,9 +1958,9 @@ impl OpenFangKernel { .map_err(KernelError::OpenFang)?; // Set state to Thinking and release lock to allow other messages (e.g. status/cancel) - // to reach the kernel without hanging the transport layer. - self.registry.set_state(agent_id, AgentState::Thinking) - .map_err(|e| KernelError::OpenFang(OpenFangError::Registry(e.to_string())))?; + // to reach the kernel without hanging the transport layer. The guard clears + // Thinking if this future is aborted or dropped while the agent loop is running. + let thinking_guard = AgentThinkingGuard::new(&self.registry, agent_id)?; drop(_guard); @@ -1951,15 +1985,20 @@ impl OpenFangKernel { sender_id, sender_name, Some(&self.config.runtime), - ).await + ) + .await }; // Re-acquire lock to commit state and results let _guard = lock.lock().await; // Reset state back to Running (or Crashed if it failed) - let final_state = if result.is_ok() { AgentState::Running } else { AgentState::Crashed }; - let _ = self.registry.set_state(agent_id, final_state); + let final_state = if result.is_ok() { + AgentState::Running + } else { + AgentState::Crashed + }; + thinking_guard.finish(final_state); match result { Ok(result) => { diff --git a/crates/openfang-runtime/src/agent_loop.rs b/crates/openfang-runtime/src/agent_loop.rs index 3a6752ad1e..19745030c6 100644 --- a/crates/openfang-runtime/src/agent_loop.rs +++ b/crates/openfang-runtime/src/agent_loop.rs @@ -66,12 +66,19 @@ fn env_timeout_secs(var: &str) -> Option { /// to `0`. In that case the tool runs with no upper bound, which is what users /// on slow local inference (vLLM on old GPUs) want for Hands and inter-agent /// delegation (issue #1125). -fn tool_timeout_for(tool_name: &str, runtime_config: Option<&openfang_types::config::RuntimeConfig>) -> Option { +fn tool_timeout_for( + tool_name: &str, + runtime_config: Option<&openfang_types::config::RuntimeConfig>, +) -> Option { let secs = match tool_name { "agent_send" | "agent_spawn" => { - runtime_config.map(|c| c.agent_tool_timeout_secs).unwrap_or(env_timeout_secs("OPENFANG_AGENT_TOOL_TIMEOUT_SECS").unwrap_or(AGENT_TOOL_TIMEOUT_SECS)) + env_timeout_secs("OPENFANG_AGENT_TOOL_TIMEOUT_SECS") + .or_else(|| runtime_config.map(|c| c.agent_tool_timeout_secs)) + .unwrap_or(AGENT_TOOL_TIMEOUT_SECS) } - _ => env_timeout_secs("OPENFANG_TOOL_TIMEOUT_SECS").unwrap_or(runtime_config.map(|c| c.tool_timeout_secs).unwrap_or(TOOL_TIMEOUT_SECS)), + _ => env_timeout_secs("OPENFANG_TOOL_TIMEOUT_SECS") + .or_else(|| runtime_config.map(|c| c.tool_timeout_secs)) + .unwrap_or(TOOL_TIMEOUT_SECS), }; if secs == 0 { None @@ -482,7 +489,13 @@ pub async fn run_agent_loop( } // Use autonomous config max_iterations if set, else default - let max_iterations = runtime_config.map(|c| c.max_iterations).or(manifest.autonomous.as_ref().map(|a| a.max_iterations)).unwrap_or(MAX_ITERATIONS); + let max_iterations = runtime_config + .map(|c| c.max_iterations) + .or(manifest.autonomous.as_ref().map(|a| a.max_iterations)) + .unwrap_or(MAX_ITERATIONS); + let max_continuations = runtime_config + .map(|c| c.max_continuations) + .unwrap_or(MAX_CONTINUATIONS); // Initialize loop guard — scale circuit breaker for autonomous agents let loop_guard_config = { @@ -1060,7 +1073,7 @@ pub async fn run_agent_loop( } StopReason::MaxTokens => { consecutive_max_tokens += 1; - if consecutive_max_tokens >= MAX_CONTINUATIONS { + if consecutive_max_tokens >= max_continuations { // Return partial response instead of continuing forever let text = response.text(); let text = if text.trim().is_empty() { @@ -1700,7 +1713,13 @@ pub async fn run_agent_loop_streaming( } // Use autonomous config max_iterations if set, else default - let max_iterations = runtime_config.map(|c| c.max_iterations).or(manifest.autonomous.as_ref().map(|a| a.max_iterations)).unwrap_or(MAX_ITERATIONS); + let max_iterations = runtime_config + .map(|c| c.max_iterations) + .or(manifest.autonomous.as_ref().map(|a| a.max_iterations)) + .unwrap_or(MAX_ITERATIONS); + let max_continuations = runtime_config + .map(|c| c.max_continuations) + .unwrap_or(MAX_CONTINUATIONS); // Initialize loop guard — scale circuit breaker for autonomous agents let loop_guard_config = { @@ -2281,7 +2300,7 @@ pub async fn run_agent_loop_streaming( } StopReason::MaxTokens => { consecutive_max_tokens += 1; - if consecutive_max_tokens >= MAX_CONTINUATIONS { + if consecutive_max_tokens >= max_continuations { let text = response.text(); let text = if text.trim().is_empty() { "[Partial response — token limit reached with no text output.]".to_string()