Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 112 additions & 47 deletions crates/openfang-api/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ pub struct AppState {
/// Thread-safe mutable budget config. Updated via PUT /api/budget.
/// Initialized from `kernel.config.budget` at startup.
pub budget_config: Arc<tokio::sync::RwLock<openfang_types::config::BudgetConfig>>,
/// Runtime API key store. Replaces `std::env::set_var`/`remove_var` (unsound in
/// multithreaded contexts). Keys written here are checked by `detect_auth_with_keys`
/// and presence checks throughout the API layer. Uses `std::sync::RwLock` (not
/// tokio's) so synchronous helpers like `build_field_json` can read without awaiting.
pub api_keys: Arc<std::sync::RwLock<HashMap<String, String>>>,
}

/// POST /api/agents — Spawn a new agent.
Expand Down Expand Up @@ -2392,15 +2397,18 @@ fn is_channel_configured(config: &openfang_types::config::ChannelsConfig, name:
}
}

/// Build a JSON field descriptor, checking env var presence but never exposing secrets.
/// Build a JSON field descriptor, checking key presence but never exposing secrets.
/// For non-secret fields, includes the actual config value from `config_values` if available.
fn build_field_json(
f: &ChannelField,
config_values: Option<&serde_json::Value>,
api_keys: &HashMap<String, String>,
) -> serde_json::Value {
let has_value = f
.env_var
.map(|ev| std::env::var(ev).map(|v| !v.is_empty()).unwrap_or(false))
.map(|ev| {
api_keys.contains_key(ev) || std::env::var(ev).map(|v| !v.is_empty()).unwrap_or(false)
})
.unwrap_or(false);
let mut field = serde_json::json!({
"key": f.key,
Expand Down Expand Up @@ -2651,6 +2659,7 @@ pub async fn list_channels(State(state): State<Arc<AppState>>) -> impl IntoRespo
// Read the live channels config (updated on every hot-reload) instead of the
// stale boot-time kernel.config, so newly configured channels show correctly.
let live_channels = state.channels_config.read().await;
let api_keys = state.api_keys.read().unwrap_or_else(|e| e.into_inner());
let mut channels = Vec::new();
let mut configured_count = 0u32;

Expand All @@ -2667,15 +2676,18 @@ pub async fn list_channels(State(state): State<Arc<AppState>>) -> impl IntoRespo
.filter(|f| f.required && f.env_var.is_some())
.all(|f| {
f.env_var
.map(|ev| std::env::var(ev).map(|v| !v.is_empty()).unwrap_or(false))
.map(|ev| {
api_keys.contains_key(ev)
|| std::env::var(ev).map(|v| !v.is_empty()).unwrap_or(false)
})
.unwrap_or(true)
});

let config_vals = channel_config_values(&live_channels, meta.name);
let fields: Vec<serde_json::Value> = meta
.fields
.iter()
.map(|f| build_field_json(f, config_vals.as_ref()))
.map(|f| build_field_json(f, config_vals.as_ref(), &api_keys))
.collect();

channels.push(serde_json::json!({
Expand Down Expand Up @@ -2751,10 +2763,13 @@ pub async fn configure_channel(
Json(serde_json::json!({"error": format!("Failed to write secret: {e}")})),
);
}
// SAFETY: We are the only writer; this is a single-threaded config operation
unsafe {
std::env::set_var(env_var, value);
}
// Store in runtime key map so detect_auth_with_keys and presence checks
// reflect the new key without calling set_var (unsound in async context).
state
.api_keys
.write()
.unwrap_or_else(|e| e.into_inner())
.insert(env_var.to_string(), value.to_string());
// Also write the env var NAME to config.toml so the channel section
// is not empty and the kernel knows which env var to read.
config_fields.insert(
Expand Down Expand Up @@ -2835,10 +2850,11 @@ pub async fn remove_channel(
for field_def in meta.fields {
if let Some(env_var) = field_def.env_var {
let _ = remove_secret_env(&secrets_path, env_var);
// SAFETY: Single-threaded config operation
unsafe {
std::env::remove_var(env_var);
}
state
.api_keys
.write()
.unwrap_or_else(|e| e.into_inner())
.remove(env_var);
}
}

Expand Down Expand Up @@ -2881,6 +2897,7 @@ pub async fn remove_channel(
/// (for Telegram). When provided, sends a real test message to verify the bot can
/// post to that channel.
pub async fn test_channel(
State(state): State<Arc<AppState>>,
Path(name): Path<String>,
raw_body: axum::body::Bytes,
) -> impl IntoResponse {
Expand All @@ -2894,12 +2911,23 @@ pub async fn test_channel(
}
};

// Clone the keys map immediately so no RwLockReadGuard (which is !Send)
// is held across any await point in this async function.
let api_keys: HashMap<String, String> = state
.api_keys
.read()
.unwrap_or_else(|e| e.into_inner())
.clone();
// Check all required env vars are set
let mut missing = Vec::new();
for field_def in meta.fields {
if field_def.required {
if let Some(env_var) = field_def.env_var {
if std::env::var(env_var).map(|v| v.is_empty()).unwrap_or(true) {
let has = api_keys.contains_key(env_var)
|| std::env::var(env_var)
.map(|v| !v.is_empty())
.unwrap_or(false);
if !has {
missing.push(env_var);
}
}
Expand Down Expand Up @@ -2929,7 +2957,7 @@ pub async fn test_channel(
.map(|s| s.to_string());

if let Some(target_id) = target {
match send_channel_test_message(&name, &target_id).await {
match send_channel_test_message(&name, &target_id, &api_keys).await {
Ok(()) => {
return (
StatusCode::OK,
Expand Down Expand Up @@ -2961,14 +2989,24 @@ pub async fn test_channel(
}

/// Send a real test message to a specific channel/chat on the given platform.
async fn send_channel_test_message(channel_name: &str, target_id: &str) -> Result<(), String> {
async fn send_channel_test_message(
channel_name: &str,
target_id: &str,
api_keys: &HashMap<String, String>,
) -> Result<(), String> {
let resolve = |var: &str| -> Result<String, String> {
api_keys
.get(var)
.cloned()
.or_else(|| std::env::var(var).ok().filter(|v| !v.is_empty()))
.ok_or_else(|| format!("{var} not set"))
};
let client = reqwest::Client::new();
let test_msg = "OpenFang test message — your channel is connected!";

match channel_name {
"discord" => {
let token = std::env::var("DISCORD_BOT_TOKEN")
.map_err(|_| "DISCORD_BOT_TOKEN not set".to_string())?;
let token = resolve("DISCORD_BOT_TOKEN")?;
let url = format!("https://discord.com/api/v10/channels/{target_id}/messages");
let resp = client
.post(&url)
Expand All @@ -2983,8 +3021,7 @@ async fn send_channel_test_message(channel_name: &str, target_id: &str) -> Resul
}
}
"telegram" => {
let token = std::env::var("TELEGRAM_BOT_TOKEN")
.map_err(|_| "TELEGRAM_BOT_TOKEN not set".to_string())?;
let token = resolve("TELEGRAM_BOT_TOKEN")?;
let url = format!("https://api.telegram.org/bot{token}/sendMessage");
let resp = client
.post(&url)
Expand All @@ -2998,8 +3035,7 @@ async fn send_channel_test_message(channel_name: &str, target_id: &str) -> Resul
}
}
"slack" => {
let token = std::env::var("SLACK_BOT_TOKEN")
.map_err(|_| "SLACK_BOT_TOKEN not set".to_string())?;
let token = resolve("SLACK_BOT_TOKEN")?;
let url = "https://slack.com/api/chat.postMessage";
let resp = client
.post(url)
Expand Down Expand Up @@ -7705,16 +7741,24 @@ pub async fn set_provider_key(
);
}

// Set env var in current process so detect_auth picks it up
std::env::set_var(&env_var, &key);

// Refresh auth detection
// Store in runtime key map so detect_auth_with_keys and presence checks
// reflect the new key without calling set_var (unsound in async context).
state
.kernel
.model_catalog
.api_keys
.write()
.unwrap_or_else(|e| e.into_inner())
.detect_auth();
.insert(env_var.clone(), key.clone());

// Refresh auth detection
{
let keys = state.api_keys.read().unwrap_or_else(|e| e.into_inner());
state
.kernel
.model_catalog
.write()
.unwrap_or_else(|e| e.into_inner())
.detect_auth_with_keys(&keys);
}

// Auto-switch default provider if current default has no working key.
// This fixes the common case where a user adds e.g. a Gemini key via dashboard
Expand All @@ -7740,10 +7784,11 @@ pub async fn set_provider_key(
let current_has_key = if current_key_env.is_empty() {
false
} else {
std::env::var(&current_key_env)
.ok()
.filter(|v| !v.is_empty())
.is_some()
state
.api_keys
.read()
.unwrap_or_else(|e| e.into_inner())
.contains_key(&current_key_env)
};
let switched = if !current_has_key && current_provider != name {
// Find a default model for the newly-keyed provider
Expand Down Expand Up @@ -7877,16 +7922,23 @@ pub async fn delete_provider_key(
);
}

// Remove from process environment
std::env::remove_var(&env_var);

// Refresh auth detection
// Remove from runtime key map
state
.kernel
.model_catalog
.api_keys
.write()
.unwrap_or_else(|e| e.into_inner())
.detect_auth();
.remove(&env_var);

// Refresh auth detection
{
let keys = state.api_keys.read().unwrap_or_else(|e| e.into_inner());
state
.kernel
.model_catalog
.write()
.unwrap_or_else(|e| e.into_inner())
.detect_auth_with_keys(&keys);
}

(
StatusCode::OK,
Expand Down Expand Up @@ -7927,7 +7979,13 @@ pub async fn test_provider(
}
};

let api_key = std::env::var(&env_var).ok();
let api_key = state
.api_keys
.read()
.unwrap_or_else(|e| e.into_inner())
.get(&env_var)
.cloned()
.or_else(|| state.kernel.resolve_credential(&env_var));
// Only require API key for providers that need one (skip local providers like ollama/vllm/lmstudio)
if key_required && api_key.is_none() && !env_var.is_empty() {
return (
Expand Down Expand Up @@ -12061,16 +12119,23 @@ pub async fn copilot_oauth_poll(
);
}

// Set in current process
std::env::set_var("GITHUB_TOKEN", access_token.as_str());

// Refresh auth detection
// Store in runtime key map (avoids set_var which is unsound in async context)
state
.kernel
.model_catalog
.api_keys
.write()
.unwrap_or_else(|e| e.into_inner())
.detect_auth();
.insert("GITHUB_TOKEN".to_string(), access_token.to_string());

// Refresh auth detection
{
let keys = state.api_keys.read().unwrap_or_else(|e| e.into_inner());
state
.kernel
.model_catalog
.write()
.unwrap_or_else(|e| e.into_inner())
.detect_auth_with_keys(&keys);
}

// Clean up flow state
COPILOT_FLOWS.remove(&poll_id);
Expand Down
20 changes: 20 additions & 0 deletions crates/openfang-api/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::webchat;
use crate::ws;
use axum::Router;
use openfang_kernel::OpenFangKernel;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::path::Path;
use std::sync::Arc;
Expand Down Expand Up @@ -41,6 +42,24 @@ pub async fn build_router(
// Start channel bridges (Telegram, etc.)
let bridge = channel_bridge::start_channel_bridge(kernel.clone()).await;

// Pre-populate the runtime API key map from the credential resolver so keys
// stored in vault or secrets.env are visible at startup without set_var.
let api_keys = {
let mut map = HashMap::new();
let catalog = kernel
.model_catalog
.read()
.unwrap_or_else(|e| e.into_inner());
for provider in catalog.list_providers() {
if !provider.api_key_env.is_empty() {
if let Some(val) = kernel.resolve_credential(&provider.api_key_env) {
map.insert(provider.api_key_env.clone(), val);
}
}
}
Arc::new(std::sync::RwLock::new(map))
};

let channels_config = kernel.config.channels.clone();
let state = Arc::new(AppState {
kernel: kernel.clone(),
Expand All @@ -52,6 +71,7 @@ pub async fn build_router(
clawhub_cache: dashmap::DashMap::new(),
provider_probe_cache: openfang_runtime::provider_health::ProbeCache::new(),
budget_config: Arc::new(tokio::sync::RwLock::new(kernel.config.budget.clone())),
api_keys,
});

// Start WS cron broadcaster — subscribes to kernel event bus and pushes
Expand Down
3 changes: 3 additions & 0 deletions crates/openfang-api/tests/api_integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use openfang_api::routes::{self, AppState};
use openfang_api::ws;
use openfang_kernel::OpenFangKernel;
use openfang_types::config::{DefaultModelConfig, KernelConfig};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use tower_http::cors::CorsLayer;
Expand Down Expand Up @@ -80,6 +81,7 @@ async fn start_test_server_with_provider(
clawhub_cache: dashmap::DashMap::new(),
provider_probe_cache: openfang_runtime::provider_health::ProbeCache::new(),
budget_config: Arc::new(tokio::sync::RwLock::new(Default::default())),
api_keys: Arc::new(std::sync::RwLock::new(HashMap::new())),
});

let app = Router::new()
Expand Down Expand Up @@ -925,6 +927,7 @@ async fn start_test_server_with_auth(api_key: &str) -> TestServer {
clawhub_cache: dashmap::DashMap::new(),
provider_probe_cache: openfang_runtime::provider_health::ProbeCache::new(),
budget_config: Arc::new(tokio::sync::RwLock::new(Default::default())),
api_keys: Arc::new(std::sync::RwLock::new(HashMap::new())),
});

let api_key = state.kernel.config.api_key.trim().to_string();
Expand Down
3 changes: 3 additions & 0 deletions crates/openfang-api/tests/daemon_lifecycle_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use openfang_api::routes::{self, AppState};
use openfang_api::server::{read_daemon_info, DaemonInfo};
use openfang_kernel::OpenFangKernel;
use openfang_types::config::{DefaultModelConfig, KernelConfig};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use tower_http::cors::CorsLayer;
Expand Down Expand Up @@ -117,6 +118,7 @@ async fn test_full_daemon_lifecycle() {
clawhub_cache: dashmap::DashMap::new(),
provider_probe_cache: openfang_runtime::provider_health::ProbeCache::new(),
budget_config: Arc::new(tokio::sync::RwLock::new(Default::default())),
api_keys: Arc::new(std::sync::RwLock::new(HashMap::new())),
});

let app = Router::new()
Expand Down Expand Up @@ -244,6 +246,7 @@ async fn test_server_immediate_responsiveness() {
clawhub_cache: dashmap::DashMap::new(),
provider_probe_cache: openfang_runtime::provider_health::ProbeCache::new(),
budget_config: Arc::new(tokio::sync::RwLock::new(Default::default())),
api_keys: Arc::new(std::sync::RwLock::new(HashMap::new())),
});

let app = Router::new()
Expand Down
Loading