Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 52 additions & 10 deletions docs/adr/line-adapter.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# ADR: LINE Messaging API Adapter

- **Status:** Proposed
- **Status:** Accepted
- **Date:** 2026-04-22
- **Author:** @chaodu-agent
- **Last Updated:** 2026-04-28
- **Author:** @chaodu-agent, @iamninihuang

---

Expand Down Expand Up @@ -101,17 +102,57 @@ LINE is not ideal when:
- Group → line:{groupId}
- Room → line:{roomId}
6. Message is routed to AdapterRouter → ACP Session Pool → kiro-cli process
7. Agent response is sent back via LINE Push Message API
7. Agent response is sent back via LINE Reply API (free) or Push Message API (fallback)
```

### Reply Strategy: Push Messages
### Hybrid Reply/Push Dispatch Flow

LINE offers two reply mechanisms:
- **Reply message**: uses a reply token, but the token expires in 1 minute
- **Push message**: no time limit, can send to any user/group at any time
```
LINE User Gateway OAB Core
│ │ │
│ message + replyToken │ │
│ ─────────────────────────▶ │ │
│ │ 1. Verify HMAC signature │
│ │ 2. Generate event_id (UUID) │
│ │ 3. Cache: │
│ │ event_id → replyToken │
│ │ (TTL 50s, max 10k) │
│ │ │
│ │ GatewayEvent { event_id } │
│ │ ─────────────────────────────▶│
│ │ │ Store event_id in
│ │ │ ChannelRef.origin_event_id
│ │ │
│ │ │ Agent processes...
│ │ │
│ │ GatewayReply { │
│ │ reply_to: event_id │
│ │ } │
│ │ ◀─────────────────────────────│
│ │ │
│ │ 4. Lookup cache(event_id) │
│ │ ├─ HIT + fresh │
│ Reply API (FREE) ✅ │ │ → Reply API │
│ ◀──────────────────────────│ │ │
│ │ ├─ HIT + expired │
│ Push API (quota) 💰 │ │ → Push API fallback │
│ ◀──────────────────────────│ │ │
│ │ └─ MISS │
│ Push API (quota) 💰 │ → Push API fallback │
│ ◀──────────────────────────│ │
```

OpenAB uses **push messages** because agent processing typically exceeds the 1-minute reply token window. The trade-off is that push messages count against the monthly messaging quota on free-tier LINE accounts.
### Reply Strategy: Hybrid Reply/Push Messages

LINE offers two reply mechanisms:
- **Reply message**: uses a reply token, but the token expires in 1 minute (free).
- **Push message**: no time limit, can send to any user/group at any time (consumes quota).

Historically, OpenAB relied solely on **push messages** because agent processing can exceed the 1-minute reply token window. To optimize costs for free-tier accounts, OpenAB now uses a **Hybrid Strategy** implemented at the gateway level:
1. The gateway caches incoming `replyToken`s keyed by `event_id` with a 50-second TTL.
2. When OAB replies with a non-empty `reply_to` that matches a cached entry, the gateway routes the message via the free **Reply API**.
3. If the token is expired, missing, or `reply_to` is empty, the gateway falls back to the **Push API**.
4. A background task sweeps expired cache entries to prevent memory growth.
---

## 3. Architectural Differences from Discord/Slack
Expand Down Expand Up @@ -381,7 +422,7 @@ For v1:
- LINE users can interact with OpenAB agents without switching to Discord or Slack
- The inbound webhook pattern opens the door for future webhook-based platforms (Telegram, WhatsApp, etc.)
- Using `axum` for the HTTP server provides a solid foundation for a general-purpose webhook gateway
- Push message strategy avoids the 1-minute reply token limitation, enabling long-running agent tasks
- Hybrid reply/push strategy optimizes cost: the gateway opportunistically uses the free Reply API when the agent responds within the token TTL, falling back to Push API for longer-running tasks

### Negative

Expand Down Expand Up @@ -414,8 +455,9 @@ To ensure this ADR is followed in implementation and future changes:

## Notes

- **Version:** 0.1
- **Version:** 0.2
- **Changelog:**
- 0.2 (2026-04-28): Hybrid Reply/Push strategy implemented (#608). Updated status to Accepted. Added dispatch flow diagram. Reply strategy section rewritten from Push-only to hybrid. Core propagates `event_id` via `ChannelRef.origin_event_id` (#619).
- 0.1 (2026-04-22): Initial proposed version

---
Expand Down
3 changes: 3 additions & 0 deletions gateway/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,15 @@ url = "ws://gateway:8080/ws"
| `TELEGRAM_BOT_TOKEN` | (required) | Telegram Bot API token |
| `GATEWAY_LISTEN` | `0.0.0.0:8080` | Listen address |
| `TELEGRAM_WEBHOOK_PATH` | `/webhook/telegram` | Webhook endpoint path |
| `LINE_CHANNEL_SECRET` | (optional) | LINE channel secret for webhook HMAC signature verification |
| `LINE_CHANNEL_ACCESS_TOKEN` | (optional) | LINE channel access token for Reply/Push API |

### Endpoints

| Path | Description |
|---|---|
| `POST /webhook/telegram` | Telegram webhook receiver |
| `POST /webhook/line` | LINE webhook receiver |
| `GET /ws` | WebSocket server (OAB connects here) |
| `GET /health` | Health check |

Expand Down
150 changes: 135 additions & 15 deletions gateway/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use axum::{
use futures_util::{SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::{broadcast, Mutex};
use tracing::{error, info, warn};

Expand Down Expand Up @@ -126,6 +127,19 @@ struct TelegramUser {

// --- App state ---

/// Cache entry for LINE reply tokens: (replyToken, insertion_time).
/// Uses std::sync::Mutex — critical sections are short (insert/remove/retain)
/// and never held across .await, so async Mutex overhead is unnecessary.
type ReplyTokenCache = Arc<std::sync::Mutex<std::collections::HashMap<String, (String, Instant)>>>;

/// Maximum age (in seconds) before a cached reply token is considered expired.
/// LINE tokens are valid for ~1 minute; we use 50s as a conservative margin.
const REPLY_TOKEN_TTL_SECS: u64 = 50;

/// Maximum number of cached reply tokens. Prevents unbounded memory growth
/// if webhooks arrive faster than OAB can reply (e.g. OAB offline, spam burst).
const REPLY_TOKEN_CACHE_MAX: usize = 10_000;

struct AppState {
bot_token: String,
secret_token: Option<String>,
Expand All @@ -134,6 +148,11 @@ struct AppState {
line_access_token: Option<String>,
/// Broadcast channel: gateway → OAB (events)
event_tx: broadcast::Sender<String>,
/// Cache: event_id → (LINE replyToken, timestamp).
/// Global across all OAB WebSocket clients. LINE reply tokens are single-use:
/// the first client to `remove()` a token wins the free Reply API call;
/// other clients for the same event naturally fall back to Push API.
reply_token_cache: ReplyTokenCache,
}

// --- Telegram webhook handler ---
Expand Down Expand Up @@ -329,9 +348,22 @@ async fn line_webhook(
.and_then(|s| s.user_id.as_deref())
.unwrap_or("unknown");

let event_id = format!("evt_{}", uuid::Uuid::new_v4());

// Cache the reply token for hybrid Reply/Push dispatch
if let Some(ref reply_token) = event.reply_token {
let mut cache = state.reply_token_cache.lock().unwrap_or_else(|e| e.into_inner());
if cache.len() >= REPLY_TOKEN_CACHE_MAX {
warn!(size = cache.len(), "reply token cache full, skipping insert");
} else {
cache.insert(event_id.clone(), (reply_token.clone(), Instant::now()));
info!(event_id = %event_id, "cached LINE replyToken");
}
}

let gateway_event = GatewayEvent {
schema: "openab.gateway.event.v1".into(),
event_id: format!("evt_{}", uuid::Uuid::new_v4()),
event_id,
timestamp: chrono::Utc::now().to_rfc3339(),
platform: "line".into(),
event_type: "message".into(),
Expand Down Expand Up @@ -400,14 +432,14 @@ async fn handle_oab_connection(state: Arc<AppState>, socket: axum::extract::ws::
break;
}
}
// No reply forwarding needed on this path — replies go to Telegram directly
}
}
});

// Receive OAB replies → Telegram
let bot_token = state.bot_token.clone();
let line_access_token = state.line_access_token.clone();
let reply_cache = state.reply_token_cache.clone();
let event_tx_for_recv = state.event_tx.clone();
// Track per-message reaction state (Telegram replaces all reactions atomically)
let reaction_state: Arc<Mutex<std::collections::HashMap<String, Vec<String>>>> =
Expand Down Expand Up @@ -535,19 +567,83 @@ async fn handle_oab_connection(state: Arc<AppState>, socket: axum::extract::ws::

// Normal send_message — route by platform
if reply.platform == "line" {
// LINE Push Message API
if let Some(ref token) = line_access_token {
info!(to = %reply.channel.id, "gateway → line");
let _ = client
.post("https://api.line.me/v2/bot/message/push")
.bearer_auth(token)
.json(&serde_json::json!({
"to": reply.channel.id,
"messages": [{"type": "text", "text": reply.content.text}]
}))
.send()
.await
.map_err(|e| error!("line send error: {e}"));
if let Some(ref access_token) = line_access_token {
// Extract token from cache (drop lock before HTTP call)
let cached_token = {
let mut cache = reply_cache.lock().unwrap_or_else(|e| e.into_inner());
cache
.remove(&reply.reply_to)
.and_then(|(token, cached_at)| {
if cached_at.elapsed().as_secs() < REPLY_TOKEN_TTL_SECS
{
Some(token)
} else {
info!("LINE replyToken expired, using Push API");
None
}
})
};

// Try Reply API first (free, no quota consumed)
let mut used_reply = false;
if let Some(reply_token) = cached_token {
info!(to = %reply.channel.id, "gateway → line (reply API)");
let resp = client
.post("https://api.line.me/v2/bot/message/reply")
.bearer_auth(access_token)
.json(&serde_json::json!({
"replyToken": reply_token,
"messages": [{"type": "text", "text": reply.content.text}]
}))
.send()
.await;
match resp {
Ok(r) if r.status().is_success() => {
used_reply = true;
}
Ok(r) => {
let status = r.status();
let body = r.text().await.unwrap_or_default();
// Only fallback to Push when LINE explicitly says
// the reply token is unusable (invalid/expired).
// LINE returns "Invalid reply token" or "expired"
// in the error body for token-specific failures.
let body_lower = body.to_lowercase();
let token_unusable = status.as_u16() == 400
&& ((body_lower.contains("invalid")
&& body_lower.contains("reply token"))
|| body_lower.contains("expired"));
if token_unusable {
warn!(status = %status, body = %body, "LINE reply token unusable, falling back to Push");
} else {
// Ambiguous: 5xx, other 4xx, or unrecognized 400.
// Message may have been delivered — do NOT fallback.
error!(status = %status, body = %body, "LINE Reply API error, NOT falling back to Push (possible duplicate risk)");
used_reply = true;
}
}
Err(e) => {
// Network/timeout error: delivery ambiguous, do NOT fallback
error!(err = %e, "LINE Reply API network error, NOT falling back to Push (possible duplicate risk)");
used_reply = true;
}
}
}

// Fallback to Push API
if !used_reply {
info!(to = %reply.channel.id, "gateway → line (push API)");
let _ = client
.post("https://api.line.me/v2/bot/message/push")
.bearer_auth(access_token)
.json(&serde_json::json!({
"to": reply.channel.id,
"messages": [{"type": "text", "text": reply.content.text}]
}))
.send()
.await
.map_err(|e| error!("line push error: {e}"));
}
}
} else {
// Telegram sendMessage
Expand Down Expand Up @@ -611,6 +707,8 @@ async fn main() -> Result<()> {
}

let (event_tx, _) = broadcast::channel::<String>(256);
let reply_token_cache: ReplyTokenCache =
Arc::new(std::sync::Mutex::new(std::collections::HashMap::new()));

let state = Arc::new(AppState {
bot_token,
Expand All @@ -619,8 +717,30 @@ async fn main() -> Result<()> {
line_channel_secret,
line_access_token,
event_tx,
reply_token_cache,
});

// Background task: sweep expired reply tokens every REPLY_TOKEN_TTL_SECS
{
let cache_state = state.clone();
tokio::spawn(async move {
loop {
tokio::time::sleep(std::time::Duration::from_secs(REPLY_TOKEN_TTL_SECS)).await;
let mut cache = cache_state.reply_token_cache.lock().unwrap_or_else(|e| e.into_inner());
let before = cache.len();
cache.retain(|_, (_, t)| t.elapsed().as_secs() < REPLY_TOKEN_TTL_SECS);
let after = cache.len();
if before != after {
info!(
removed = before - after,
remaining = after,
"reply token cache sweep"
);
}
}
});
}

let app = Router::new()
.route(&webhook_path, post(telegram_webhook))
.route("/webhook/line", post(line_webhook))
Expand Down
Loading