-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlog_parser.py
More file actions
330 lines (269 loc) · 12.3 KB
/
Copy pathlog_parser.py
File metadata and controls
330 lines (269 loc) · 12.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
#!/usr/bin/env python3
import sys
import time
import json
import argparse
import requests
import re
from datetime import datetime
from collections import deque
import os
OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://localhost:11434/api/chat")
DEFAULT_MODEL = "qwen2.5-coder:1.5b"
SYSTEM_PROMPT_TEMPLATE = """You are an expert SRE and application log analyst{context_clause}.
Explain log lines in plain English for an on-call engineer during an incident.
Be concise — 1 to 2 sentences max.
Focus on: what happened, why it might have occurred, and if it looks like an error, what to check next.
Never repeat the raw log line. Just explain it clearly."""
SEVERITY_PATTERNS = {
"CRITICAL": [r"\bCRITICAL\b", r"\bFATAL\b", r"\bPANIC\b", r"OutOfMemory", r"OOMKilled", r"segfault"],
"ERROR": [r"\bERROR\b", r"\bException\b", r"\bTraceback\b", r"\bFailed\b", r"\bfailed\b", r"500"],
"WARN": [r"\bWARN\b", r"\bWARNING\b", r"\bDeprecated\b", r"\bRetrying\b", r"\bretry\b", r"timeout"],
"INFO": [r"\bINFO\b", r"\bStarted\b", r"\bStopped\b", r"\bConnected\b", r"\bListening\b"],
}
SEVERITY_COLORS = {
"CRITICAL": "\033[1;35m", # bold magenta
"ERROR": "\033[1;31m", # bold red
"WARN": "\033[1;33m", # bold yellow
"INFO": "\033[1;34m", # bold blue
"UNKNOWN": "\033[0;37m", # grey
}
RESET = "\033[0m"
DIM = "\033[2m"
BOLD = "\033[1m"
def classify_severity(line: str) -> str:
# First pass: look for an explicit severity word anywhere in the line.
# This avoids misclassifying lines like "WARNING ... lag=1500" as ERROR
# just because a pattern keyword appears later in the message body.
_EXPLICIT = {
"CRITICAL": ["CRITICAL", "FATAL", "PANIC"],
"ERROR": ["ERROR"],
"WARN": ["WARNING", "WARN"],
"INFO": ["INFO"],
}
upper = line.upper()
for severity, keywords in _EXPLICIT.items():
for kw in keywords:
if kw in upper:
return severity
# Second pass: fall back to full pattern matching for lines with no
# explicit severity word (e.g. bare exception tracebacks, OOM messages)
for severity, patterns in SEVERITY_PATTERNS.items():
for pattern in patterns:
if re.search(pattern, line):
return severity
return "UNKNOWN"
def explain_log_line(line: str, model: str, context: str = "") -> str:
"""Stream an explanation of a log line token by token from Ollama.
Prints tokens live to stdout and returns the full explanation text.
"""
line = line.strip()
if not line:
return ""
context_clause = f" familiar with {context} applications" if context else ""
system_prompt = SYSTEM_PROMPT_TEMPLATE.format(context_clause=context_clause)
payload = {
"model": model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": (
"Explain this application log line in plain English. "
"Keep it concise — 1-2 sentences maximum. "
"Focus on what it means and whether action is needed.:\n\n" + line
)},
],
"stream": True,
}
full_explanation = []
try:
print(f" {DIM}↳{RESET} ", end="", flush=True)
with requests.post(OLLAMA_URL, json=payload, timeout=30, stream=True) as response:
response.raise_for_status()
for chunk in response.iter_lines():
if chunk:
data = json.loads(chunk)
token = data.get("message", {}).get("content", "")
print(token, end="", flush=True)
full_explanation.append(token)
if data.get("done"):
break
print() # newline after stream ends
except requests.exceptions.ConnectionError:
print(f"[ERROR] Cannot connect to Ollama. Is it running? Try: ollama serve")
except requests.exceptions.Timeout:
print(f"[ERROR] Ollama timed out. Model may still be loading.")
except Exception as e:
print(f"[ERROR] {str(e)}")
return "".join(full_explanation)
class PatternDetector:
"""Detects repeated error patterns within a sliding time window."""
def __init__(self, window_seconds: int = 60, threshold: int = 5):
self.window_seconds = window_seconds
self.threshold = threshold
self.buckets: dict[str, deque] = {}
self.alerted: set[str] = set()
def _normalize(self, line: str) -> str:
# Strip timestamps, IDs, and numbers to find structural patterns
line = re.sub(r"\d{4}-\d{2}-\d{2}[\sT]\d{2}:\d{2}:\d{2}[\.,]?\d*", "", line)
line = re.sub(r"\b[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\b", "<uuid>", line)
line = re.sub(r"\b\d+\b", "<n>", line)
return line.strip()
def check(self, line: str) -> int | None:
"""Returns count if threshold crossed, else None."""
key = self._normalize(line)
now = time.time()
if key not in self.buckets:
self.buckets[key] = deque()
dq = self.buckets[key]
dq.append(now)
# Evict old entries outside the window
while dq and dq[0] < now - self.window_seconds:
dq.popleft()
count = len(dq)
if count >= self.threshold and key not in self.alerted:
self.alerted.add(key)
return count
# Reset alert if count drops back below threshold
if count < self.threshold and key in self.alerted:
self.alerted.discard(key)
return None
class IncidentSummarizer:
"""Tracks recent errors and produces incident summaries."""
def __init__(self, window_seconds: int = 120, spike_threshold: int = 10):
self.window_seconds = window_seconds
self.spike_threshold = spike_threshold
self.error_times: deque = deque()
self.error_lines: deque = deque()
self.last_summary_at: float = 0
def record(self, line: str, severity: str):
if severity in ("ERROR", "CRITICAL"):
now = time.time()
self.error_times.append(now)
self.error_lines.append(line.strip())
# Evict old entries — pop from both deques together to stay in sync
while self.error_times and self.error_times[0] < now - self.window_seconds:
self.error_times.popleft()
self.error_lines.popleft()
def should_summarize(self) -> bool:
now = time.time()
if (len(self.error_times) >= self.spike_threshold and
now - self.last_summary_at > self.window_seconds):
self.last_summary_at = now
return True
return False
def get_summary_prompt(self) -> str:
lines = list(self.error_lines)[-20:]
joined = "\n".join(lines)
return (
f"You are an SRE. The following {len(self.error_times)} errors occurred in the last "
f"{self.window_seconds} seconds. Summarize what is likely going wrong in 2-3 sentences "
f"and suggest one immediate action.\n\nErrors:\n{joined}"
)
def print_separator(): # pragma: no cover
print(f"{DIM}{'─' * 60}{RESET}")
def tail_file(filepath: str, model: str, context: str, min_severity: str, elk_writer=None): # pragma: no cover
severity_order = ["INFO", "UNKNOWN", "WARN", "ERROR", "CRITICAL"]
min_index = severity_order.index(min_severity) if min_severity in severity_order else 0
detector = PatternDetector(window_seconds=60, threshold=5)
summarizer = IncidentSummarizer(window_seconds=120, spike_threshold=10)
print(f"\n{BOLD}Log Explainer{RESET} — SRE Incident Response Tool")
print(f"{DIM}File: {filepath}{RESET}")
print(f"{DIM}Model: {model}{RESET}")
print(f"{DIM}Filter: {min_severity}+{RESET}")
if context:
print(f"{DIM}Context: {context}{RESET}")
if elk_writer:
print(f"{DIM}ELK: {elk_writer.output_file}{RESET}")
print_separator()
print(f"{DIM}Waiting for new log lines...{RESET}\n")
try:
with open(filepath, "r") as f:
# Only seek to end for regular files. stdin/pipes are not seekable
# and already start at the current position — seeking would drop input.
if f.seekable():
f.seek(0, 2)
while True:
line = f.readline()
if not line:
time.sleep(0.3)
continue
line = line.strip()
if not line:
continue
severity = classify_severity(line)
summarizer.record(line, severity)
# Filter by minimum severity
if severity_order.index(severity) < min_index:
continue
color = SEVERITY_COLORS.get(severity, SEVERITY_COLORS["UNKNOWN"])
ts = datetime.now().strftime("%H:%M:%S")
# Strip leading timestamp from the raw line if present so we
# don't print two timestamps side by side. Handles formats:
# 2026-04-12 17:30:04,792 WARNING message
# 2026-04-12T17:30:04 WARNING message
display_line = re.sub(
r"^\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}[,.]?\d*\s+",
"",
line,
)
print(f"{DIM}{ts}{RESET} {color}[{severity:<8}]{RESET} {display_line}")
# Pattern spike detection
repeat_count = detector.check(line)
if repeat_count:
print(f" {SEVERITY_COLORS['WARN']}⚠ Pattern repeated {repeat_count}x in 60s — possible recurring issue{RESET}")
# Stream explanation token by token, capture return value
explanation = explain_log_line(line, model, context)
# Write to ELK if enabled
if elk_writer and explanation:
elk_writer.write(raw_log=line, explanation=explanation, severity=severity)
# Incident summary on error spikes
if summarizer.should_summarize():
print()
print(f"{SEVERITY_COLORS['CRITICAL']}{BOLD}INCIDENT SPIKE DETECTED — generating summary...{RESET}")
explain_log_line(summarizer.get_summary_prompt(), model, context)
print_separator()
print()
except FileNotFoundError:
print(f"[ERROR] File not found: {filepath}")
sys.exit(1)
except KeyboardInterrupt:
print(f"\n{DIM}[Stopped]{RESET}")
sys.exit(0)
def main(): # pragma: no cover
parser = argparse.ArgumentParser(
description="Tail a log file and explain each line in plain English using a local Ollama model.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python log_parser.py /var/log/myapp.log
python log_parser.py /var/log/myapp.log --model mistral
python log_parser.py /var/log/myapp.log --context "Django REST API" --severity WARN
python log_parser.py /var/log/myapp.log --elk-output
python log_parser.py /var/log/myapp.log --elk-output --elk-file /tmp/explained-logs.jsonl
tail -f /var/log/myapp.log | python log_parser.py /dev/stdin
"""
)
parser.add_argument("logfile", help="Path to the log file to tail")
parser.add_argument("--model", default=DEFAULT_MODEL, help=f"Ollama model to use (default: {DEFAULT_MODEL})")
parser.add_argument("--context", default="", help='App description e.g. "Django REST API with Postgres"')
parser.add_argument(
"--severity", default="INFO",
choices=["INFO", "WARN", "ERROR", "CRITICAL"],
help="Minimum severity to display (default: INFO)"
)
parser.add_argument(
"--elk-output", action="store_true",
help="Write explained logs as JSON to a file for Filebeat/Elasticsearch ingestion"
)
parser.add_argument(
"--elk-file", default="explained-logs.jsonl",
help="Output file for ELK JSON documents (default: explained-logs.jsonl)"
)
args = parser.parse_args()
elk_writer = None
if args.elk_output:
from elk_writer import ELKWriter
elk_writer = ELKWriter(output_file=args.elk_file, model=args.model)
tail_file(args.logfile, args.model, args.context, args.severity, elk_writer=elk_writer)
if __name__ == "__main__":
main()