diff --git a/src/claude_monitor/core/models.py b/src/claude_monitor/core/models.py index 4cbe9b8..a4c349d 100644 --- a/src/claude_monitor/core/models.py +++ b/src/claude_monitor/core/models.py @@ -29,6 +29,7 @@ class UsageEntry: model: str = "" message_id: str = "" request_id: str = "" + keyword: Optional[str] = None @dataclass diff --git a/src/claude_monitor/core/monitor_ui.py b/src/claude_monitor/core/monitor_ui.py new file mode 100644 index 0000000..7466ff5 --- /dev/null +++ b/src/claude_monitor/core/monitor_ui.py @@ -0,0 +1,228 @@ +#!/usr/bin/env python3 +""" +Claude Code Usage Monitor (Interactive Peek Edition) +by Sylvester Assiamah (AssiamahS) +─────────────────────────────────────────────── +✅ Adds total tokens spent + estimated cost +✅ Shows last N messages with global numbering and text preview +✅ Press 1 | 2 | 3 to toggle last 3 / 15 / 30 messages +✅ Keeps Rich live dashboard +""" + + +import os +import sys +import time +import json +import threading +import termios +import tty +import sys as system +from claude_monitor.data.writer import add_keyword_to_existing_entry +from datetime import datetime, timedelta, timezone +from rich.console import Console, Group +from rich.panel import Panel +from rich.progress import Progress, BarColumn, TextColumn +from rich.table import Table +from rich.live import Live + +sys.path.append(os.path.expanduser("~/code/Claude-Code-Usage-Monitor/src")) +from claude_monitor.data.reader import load_usage_entries + +console = Console() + +# ───────────────────────────────────────────── +# Global State +# ───────────────────────────────────────────── +last_n_display = 3 +stop_listen = False + +# ───────────────────────────────────────────── +# Keyboard listener +# ───────────────────────────────────────────── +def listen_for_keys(): + global last_n_display, stop_listen + fd = system.stdin.fileno() + old = termios.tcgetattr(fd) + tty.setcbreak(fd) + try: + while not stop_listen: + if system.stdin in select([system.stdin], [], [], 0.1)[0]: + ch = system.stdin.read(1) + if ch == "1": + last_n_display = 3 + elif ch == "2": + last_n_display = 15 + elif ch == "3": + last_n_display = 30 + elif ch.lower() == "q": + stop_listen = True + finally: + termios.tcsetattr(fd, termios.TCSADRAIN, old) + + +# ───────────────────────────────────────────── +# Helpers +# ───────────────────────────────────────────── +def get_recent_entries(hours=24): + entries, _ = load_usage_entries(include_raw=True) + cutoff = datetime.now(timezone.utc) + safe = [] + for e in entries: + ts = e.timestamp + if ts.tzinfo is None: + ts = ts.replace(tzinfo=timezone.utc) + if ts > cutoff - timedelta(hours=hours): + safe.append(e) + return safe + + +def load_recent_raw_prompts(limit=50): + paths = [ + os.path.expanduser("~/.claude/code/usage/usage.jsonl"), + os.path.expanduser("~/Library/Application Support/Claude/code/usage/usage.jsonl"), + ] + path = next((p for p in paths if os.path.exists(p)), None) + if not path: + return [] + + lines = [] + with open(path, "r", encoding="utf-8") as f: + for line in f: + try: + lines.append(json.loads(line)) + except Exception: + pass + lines = lines[-limit:] + result = [] + for entry in lines: + ts = entry.get("timestamp") + text = entry.get("text") or entry.get("prompt") or "" + if ts: + try: + dt = datetime.fromisoformat(ts.replace("Z", "+00:00")) + except Exception: + continue + result.append({"timestamp": dt, "text": text}) + return result + + +# ───────────────────────────────────────────── +# Rendering +# ───────────────────────────────────────────── +def render_usage_panel(): + entries = get_recent_entries() + if not entries: + return Panel("No usage data found yet.", + title="📊 Claude Code Usage Monitor", + border_style="yellow") + + total_tokens = sum(e.input_tokens + e.output_tokens for e in entries) + total_messages = len(entries) + cost_per_token = 0.000002 + total_cost = total_tokens * cost_per_token + limit = 19000 + remaining = max(0, limit - total_tokens) + percent = min(total_tokens / limit, 1.0) + + progress = Progress( + TextColumn("💰 Tokens Used:"), + BarColumn(bar_width=40), + TextColumn(f"{total_tokens:,} / {limit:,}"), + expand=False, + ) + progress.add_task("", total=1.0, completed=percent) + + tbl = Table.grid(expand=True) + tbl.add_row(f"🕒 Last {len(entries)} messages", f"💲 Est. Cost: ${total_cost:.2f}") + tbl.add_row(f"🧮 Tokens Remaining: {remaining:,}", f"📨 Messages: {total_messages}") + tbl.add_row( + f"[cyan]🧠 Tokens Spent (Total):[/cyan] {total_tokens:,}", + f"[magenta]💲 Total Approx. Cost:[/magenta] ${total_cost:.4f}" + ) + + return Panel(Group(progress, tbl), + title="✦ ✧ CLAUDE CODE USAGE MONITOR ✦ ✧", + border_style="green") + + +def render_overlay(): + """Recent messages overlay showing prompt snippet.""" + entries, _ = load_usage_entries(include_raw=True) + if not entries: + return Panel("⚠️ No token data yet.", + title="🧠 Recent Messages", + border_style="yellow") + + total_entries = len(entries) + recent = list(enumerate(entries[-last_n_display:], start=total_entries - last_n_display + 1)) + raw_prompts = load_recent_raw_prompts(limit=last_n_display * 2) + + msgs = [] + for idx, e in recent: + tokens_used = e.input_tokens + e.output_tokens + t = e.timestamp.strftime("%H:%M:%S") + model = e.model + keyword = getattr(e, "keyword", None) + keyword_text = f" - {keyword}" if keyword else "" + + # find snippet + raw_text = getattr(e, "text", "") or getattr(e, "raw", "") + if not raw_text and raw_prompts: + closest = min( + raw_prompts, + key=lambda x: abs((x["timestamp"] - e.timestamp).total_seconds()), + default=None, + ) + if closest: + raw_text = closest["text"] + + preview = "" + if raw_text: + words = raw_text.strip().split() + snippet = " ".join(words[:8]) + preview = f' [dim italic]– “{snippet}{"..." if len(words) > 8 else ""}”[/dim italic]' + + msgs.append(f"{idx}. [bold]{t}[/bold] | {tokens_used} tokens ({model}){keyword_text}{preview}") + + total = sum(e.input_tokens + e.output_tokens for _, e in recent) + avg = total / len(recent) + lines = "\n".join(msgs + [ + "─" * 45, + f"Total: {total} | Avg/msg: {avg:.1f}", + f"[dim]Press 1=3 msgs • 2=15 msgs • 3=30 msgs • Q=Quit[/dim]" + ]) + return Panel(lines, title=f"🧠 Recent Message Tokens (last {last_n_display})", border_style="cyan") + + +def render_combined_layout(): + return Group(render_usage_panel(), render_overlay()) + + +# ───────────────────────────────────────────── +# Main loop +# ───────────────────────────────────────────── +from select import select + +def main(): + global stop_listen + console.clear() + console.print("[dim]Claude Code Usage Monitor running — press 1 | 2 | 3 to change view, Q to quit[/dim]\n") + + listener = threading.Thread(target=listen_for_keys, daemon=True) + listener.start() + + with Live(render_combined_layout(), refresh_per_second=2, console=console) as live: + try: + while not stop_listen: + live.update(render_combined_layout()) + time.sleep(2) + except KeyboardInterrupt: + stop_listen = True + finally: + console.clear() + console.print("[red]👋 Exiting Claude Code Usage Monitor.[/red]") + + +if __name__ == "__main__": + main() diff --git a/src/claude_monitor/data/reader.py b/src/claude_monitor/data/reader.py index 5aa8e18..1fa68c6 100644 --- a/src/claude_monitor/data/reader.py +++ b/src/claude_monitor/data/reader.py @@ -1,13 +1,15 @@ -"""Simplified data reader for Claude Monitor. - -Combines functionality from file_reader, filter, mapper, and processor -into a single cohesive module. +#!/usr/bin/env python3 +""" +Simplified data reader for Claude Monitor. +─────────────────────────────────────────────── +✅ Combines file loading, mapping, and filtering +✅ Adds full keyword propagation to UsageEntry +✅ Works seamlessly with monitor overlay """ import json import logging -from datetime import datetime, timedelta -from datetime import timezone as tz +from datetime import datetime, timedelta, timezone as tz from pathlib import Path from typing import Any, Dict, List, Optional, Set, Tuple @@ -29,24 +31,17 @@ logger = logging.getLogger(__name__) +# ───────────────────────────────────────────── +# Public: load usage entries +# ───────────────────────────────────────────── def load_usage_entries( data_path: Optional[str] = None, hours_back: Optional[int] = None, mode: CostMode = CostMode.AUTO, include_raw: bool = False, ) -> Tuple[List[UsageEntry], Optional[List[Dict[str, Any]]]]: - """Load and convert JSONL files to UsageEntry objects. - - Args: - data_path: Path to Claude data directory (defaults to ~/.claude/projects) - hours_back: Only include entries from last N hours - mode: Cost calculation mode - include_raw: Whether to return raw JSON data alongside entries - - Returns: - Tuple of (usage_entries, raw_data) where raw_data is None unless include_raw=True - """ - data_path = Path(data_path if data_path else "~/.claude/projects").expanduser() + """Load and convert JSONL files to UsageEntry objects.""" + data_path = Path(data_path or "~/.claude/projects").expanduser() timezone_handler = TimezoneHandler() pricing_calculator = PricingCalculator() @@ -78,25 +73,19 @@ def load_usage_entries( raw_entries.extend(raw_data) all_entries.sort(key=lambda e: e.timestamp) - - logger.info(f"Processed {len(all_entries)} entries from {len(jsonl_files)} files") - + logger.info("Processed %d entries from %d files", len(all_entries), len(jsonl_files)) return all_entries, raw_entries +# ───────────────────────────────────────────── +# Public: load raw entries (for debugging) +# ───────────────────────────────────────────── def load_all_raw_entries(data_path: Optional[str] = None) -> List[Dict[str, Any]]: - """Load all raw JSONL entries without processing. - - Args: - data_path: Path to Claude data directory - - Returns: - List of raw JSON dictionaries - """ - data_path = Path(data_path if data_path else "~/.claude/projects").expanduser() + """Load all raw JSONL entries without processing.""" + data_path = Path(data_path or "~/.claude/projects").expanduser() jsonl_files = _find_jsonl_files(data_path) - all_raw_entries: List[Dict[str, Any]] = [] + for file_path in jsonl_files: try: with open(file_path, encoding="utf-8") as f: @@ -109,11 +98,13 @@ def load_all_raw_entries(data_path: Optional[str] = None) -> List[Dict[str, Any] except json.JSONDecodeError: continue except Exception as e: - logger.exception(f"Error loading raw entries from {file_path}: {e}") - + logger.exception("Error loading raw entries from %s: %s", file_path, e) return all_raw_entries +# ───────────────────────────────────────────── +# Internal: file handling and filtering +# ───────────────────────────────────────────── def _find_jsonl_files(data_path: Path) -> List[Path]: """Find all .jsonl files in the data directory.""" if not data_path.exists(): @@ -131,50 +122,66 @@ def _process_single_file( timezone_handler: TimezoneHandler, pricing_calculator: PricingCalculator, ) -> Tuple[List[UsageEntry], Optional[List[Dict[str, Any]]]]: - """Process a single JSONL file.""" + """Process a single JSONL file into UsageEntry objects.""" entries: List[UsageEntry] = [] raw_data: Optional[List[Dict[str, Any]]] = [] if include_raw else None try: - entries_read = 0 - entries_filtered = 0 - entries_mapped = 0 + # First pass: build a map of user messages by UUID + user_messages = {} + all_lines = [] with open(file_path, encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue - try: data = json.loads(line) - entries_read += 1 - - if not _should_process_entry( - data, cutoff_time, processed_hashes, timezone_handler - ): - entries_filtered += 1 - continue - - entry = _map_to_usage_entry( - data, mode, timezone_handler, pricing_calculator - ) - if entry: - entries_mapped += 1 - entries.append(entry) - _update_processed_hashes(data, processed_hashes) - - if include_raw: - raw_data.append(data) + all_lines.append(data) + + # Store user messages for later lookup + if data.get("type") == "user": + uuid = data.get("uuid") + message = data.get("message", {}) + content = message.get("content", "") + + # Extract text from user message + text = "" + if isinstance(content, str): + text = content + elif isinstance(content, list): + for block in content: + if isinstance(block, dict) and block.get("type") == "text": + text = block.get("text", "") + break + + if uuid and text: + user_messages[uuid] = text + + except json.JSONDecodeError: + continue - except json.JSONDecodeError as e: - logger.debug(f"Failed to parse JSON line in {file_path}: {e}") + # Second pass: process assistant messages and link to user messages + for data in all_lines: + try: + if not _should_process_entry(data, cutoff_time, processed_hashes, timezone_handler): continue - logger.debug( - f"File {file_path.name}: {entries_read} read, " - f"{entries_filtered} filtered out, {entries_mapped} successfully mapped" - ) + # Add user text to assistant message if available + if data.get("type") == "assistant": + parent_uuid = data.get("parentUuid") + if parent_uuid and parent_uuid in user_messages: + data["_user_text"] = user_messages[parent_uuid] + + entry = _map_to_usage_entry(data, mode, timezone_handler, pricing_calculator) + if entry: + entries.append(entry) + _update_processed_hashes(data, processed_hashes) + if include_raw: + raw_data.append(data) + except json.JSONDecodeError: + continue except Exception as e: logger.warning("Failed to read file %s: %s", file_path, e) @@ -203,7 +210,6 @@ def _should_process_entry( timestamp = processor.parse_timestamp(timestamp_str) if timestamp and timestamp < cutoff_time: return False - unique_hash = _create_unique_hash(data) return not (unique_hash and unique_hash in processed_hashes) @@ -211,12 +217,9 @@ def _should_process_entry( def _create_unique_hash(data: Dict[str, Any]) -> Optional[str]: """Create unique hash for deduplication.""" message_id = data.get("message_id") or ( - data.get("message", {}).get("id") - if isinstance(data.get("message"), dict) - else None + data.get("message", {}).get("id") if isinstance(data.get("message"), dict) else None ) request_id = data.get("requestId") or data.get("request_id") - return f"{message_id}:{request_id}" if message_id and request_id else None @@ -227,13 +230,16 @@ def _update_processed_hashes(data: Dict[str, Any], processed_hashes: Set[str]) - processed_hashes.add(unique_hash) +# ───────────────────────────────────────────── +# Mapping: raw JSON to UsageEntry +# ───────────────────────────────────────────── def _map_to_usage_entry( data: Dict[str, Any], mode: CostMode, timezone_handler: TimezoneHandler, pricing_calculator: PricingCalculator, ) -> Optional[UsageEntry]: - """Map raw data to UsageEntry with proper cost calculation.""" + """Map raw data to UsageEntry with full keyword and text support.""" try: timestamp_processor = TimestampProcessor(timezone_handler) timestamp = timestamp_processor.parse_timestamp(data.get("timestamp", "")) @@ -245,7 +251,6 @@ def _map_to_usage_entry( return None model = DataConverter.extract_model_name(data, default="unknown") - entry_data: Dict[str, Any] = { FIELD_MODEL: model, TOKEN_INPUT: token_data["input_tokens"], @@ -254,13 +259,37 @@ def _map_to_usage_entry( "cache_read_tokens": token_data.get("cache_read_tokens", 0), FIELD_COST_USD: data.get("cost") or data.get(FIELD_COST_USD), } + cost_usd = pricing_calculator.calculate_cost_for_entry(entry_data, mode) message = data.get("message", {}) message_id = data.get("message_id") or message.get("id") or "" request_id = data.get("request_id") or data.get("requestId") or "unknown" - return UsageEntry( + # Extract keyword and text from message content + keyword = data.get("keyword") + text = data.get("_user_text", "") # Use linked user text if available + + # If no user text, try extracting from message content directly + if not text: + content = message.get("content", "") + if isinstance(content, str): + text = content + elif isinstance(content, list): + # Get first text content block + for block in content: + if isinstance(block, dict) and block.get("type") == "text": + text = block.get("text", "") + break + + # Generate keyword from text if not already set (first 3-5 words) + if not keyword and text: + words = text.strip().split() + keyword = " ".join(words[:5]) if len(words) <= 5 else " ".join(words[:3]) + if len(keyword) > 40: + keyword = keyword[:37] + "..." + + entry = UsageEntry( timestamp=timestamp, input_tokens=token_data["input_tokens"], output_tokens=token_data["output_tokens"], @@ -270,51 +299,46 @@ def _map_to_usage_entry( model=model, message_id=message_id, request_id=request_id, + keyword=keyword, ) + # Attach text field if supported by your UsageEntry model + if hasattr(entry, "__dict__"): + entry.text = text + + return entry + except (KeyError, ValueError, TypeError, AttributeError) as e: - logger.debug(f"Failed to map entry: {type(e).__name__}: {e}") + logger.debug("Failed to map entry: %s: %s", type(e).__name__, e) return None +# ───────────────────────────────────────────── +# Compatibility wrapper for legacy interfaces +# ───────────────────────────────────────────── class UsageEntryMapper: - """Compatibility wrapper for legacy UsageEntryMapper interface. - - This class provides backward compatibility for tests that expect - the old UsageEntryMapper interface, wrapping the new functional - approach in _map_to_usage_entry. - """ + """Compatibility wrapper for legacy UsageEntryMapper interface.""" - def __init__( - self, pricing_calculator: PricingCalculator, timezone_handler: TimezoneHandler - ): - """Initialize with required components.""" + def __init__(self, pricing_calculator: PricingCalculator, timezone_handler: TimezoneHandler): self.pricing_calculator = pricing_calculator self.timezone_handler = timezone_handler def map(self, data: Dict[str, Any], mode: CostMode) -> Optional[UsageEntry]: - """Map raw data to UsageEntry - compatibility interface.""" - return _map_to_usage_entry( - data, mode, self.timezone_handler, self.pricing_calculator - ) + return _map_to_usage_entry(data, mode, self.timezone_handler, self.pricing_calculator) def _has_valid_tokens(self, tokens: Dict[str, int]) -> bool: - """Check if tokens are valid (for test compatibility).""" return any(v > 0 for v in tokens.values()) def _extract_timestamp(self, data: Dict[str, Any]) -> Optional[datetime]: - """Extract timestamp (for test compatibility).""" if "timestamp" not in data: return None processor = TimestampProcessor(self.timezone_handler) return processor.parse_timestamp(data["timestamp"]) def _extract_model(self, data: Dict[str, Any]) -> str: - """Extract model name (for test compatibility).""" return DataConverter.extract_model_name(data, default="unknown") def _extract_metadata(self, data: Dict[str, Any]) -> Dict[str, str]: - """Extract metadata (for test compatibility).""" message = data.get("message", {}) return { "message_id": data.get("message_id") or message.get("id", ""), diff --git a/src/claude_monitor/data/writer.py b/src/claude_monitor/data/writer.py new file mode 100644 index 0000000..692b6c4 --- /dev/null +++ b/src/claude_monitor/data/writer.py @@ -0,0 +1,141 @@ +#!/usr/bin/env python3 +""" +Claude Monitor Data Writer +─────────────────────────────────────────────── +Handles usage logging and keyword tagging for the Claude Code Usage Monitor. + +Features: +✅ Logs model usage entries with metadata +✅ Adds or updates keywords for existing messages +✅ Compatible with both ~/.claude/projects and ~/.claude/code paths +""" + +import json +import logging +import os +from datetime import datetime +from pathlib import Path +from typing import Optional + +logger = logging.getLogger(__name__) + +# Default log file search paths (works for both Claude Desktop and Claude Code) +USAGE_PATHS = [ + os.path.expanduser("~/.claude/projects/usage.jsonl"), + os.path.expanduser("~/.claude/code/usage/usage.jsonl"), + os.path.expanduser("~/Library/Application Support/Claude/code/usage/usage.jsonl"), +] + + +# ───────────────────────────────────────────── +# Log a new usage entry +# ───────────────────────────────────────────── +def log_usage_entry( + timestamp: datetime, + model: str, + input_tokens: int, + output_tokens: int, + keyword: Optional[str] = None, + cache_creation_tokens: int = 0, + cache_read_tokens: int = 0, + cost_usd: float = 0.0, + message_id: str = "", + request_id: str = "", + log_path: Optional[str] = None, +) -> bool: + """ + Log a usage entry with optional keyword summary. + + Example: + log_usage_entry( + timestamp=datetime.now(), + model="claude-sonnet-4-5", + input_tokens=300, + output_tokens=150, + keyword="site fix", + ) + """ + if log_path is None: + # Prefer ~/.claude/projects, fallback to ~/.claude/code + log_path = next((Path(p) for p in USAGE_PATHS if os.path.exists(os.path.dirname(p))), Path(USAGE_PATHS[0])) + + log_path.parent.mkdir(parents=True, exist_ok=True) + + entry = { + "timestamp": timestamp.isoformat(), + "model": model, + "input_tokens": input_tokens, + "output_tokens": output_tokens, + "cache_creation_tokens": cache_creation_tokens, + "cache_read_tokens": cache_read_tokens, + "cost_usd": cost_usd, + "message_id": message_id, + "request_id": request_id, + } + + if keyword: + entry["keyword"] = keyword + + try: + with open(log_path, "a", encoding="utf-8") as f: + f.write(json.dumps(entry) + "\n") + logger.info("✅ Logged usage entry (%s) with keyword: %s", message_id, keyword or "(none)") + return True + except Exception as e: + logger.error("❌ Failed to write usage entry: %s", e) + return False + + +# ───────────────────────────────────────────── +# Add or update keyword on existing entries +# ───────────────────────────────────────────── +def add_keyword_to_existing_entry(message_id: str, keyword: str) -> bool: + """ + Add or update a keyword for a specific message in any valid usage.jsonl file. + Example: + add_keyword_to_existing_entry("msg_12345", "login error") + """ + # Find whichever usage log actually exists + path = next((p for p in USAGE_PATHS if os.path.exists(p)), None) + if not path: + print("❌ No usage log file found in known paths.") + return False + + lines = [] + found = False + + # Read all entries + try: + with open(path, "r", encoding="utf-8") as f: + for line in f: + if not line.strip(): + continue + try: + entry = json.loads(line) + entry_msg_id = entry.get("message_id") or ( + entry.get("message", {}).get("id") if isinstance(entry.get("message"), dict) else None + ) + if entry_msg_id == message_id: + entry["keyword"] = keyword + found = True + lines.append(entry) + except json.JSONDecodeError: + continue + except FileNotFoundError: + print("❌ Log file not found.") + return False + + if not found: + print(f"⚠️ Message ID {message_id} not found in log.") + return False + + # Rewrite file safely + try: + with open(path, "w", encoding="utf-8") as f: + for e in lines: + f.write(json.dumps(e) + "\n") + print(f"✅ Added keyword '{keyword}' to message {message_id}") + return True + except Exception as e: + print(f"❌ Failed to update log: {e}") + return False