mirror of
https://github.com/MacRimi/ProxMenux.git
synced 2026-04-06 04:13:48 +00:00
391 lines
12 KiB
Python
391 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
AI Context Enrichment Module
|
|
|
|
Enriches notification context with additional information to help AI provide
|
|
more accurate and helpful responses:
|
|
|
|
1. Event frequency - how often this error has occurred
|
|
2. System uptime - helps distinguish startup issues from runtime failures
|
|
3. SMART disk data - for disk-related errors
|
|
4. Known error matching - from proxmox_known_errors database
|
|
|
|
Author: MacRimi
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import subprocess
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional, Dict, Any
|
|
import sqlite3
|
|
from pathlib import Path
|
|
|
|
# Import known errors database
|
|
try:
|
|
from proxmox_known_errors import get_error_context, find_matching_error
|
|
except ImportError:
|
|
def get_error_context(*args, **kwargs):
|
|
return None
|
|
def find_matching_error(*args, **kwargs):
|
|
return None
|
|
|
|
DB_PATH = Path('/usr/local/share/proxmenux/health_monitor.db')
|
|
|
|
|
|
def get_system_uptime() -> str:
|
|
"""Get system uptime in human-readable format.
|
|
|
|
Returns:
|
|
String like "2 minutes (recently booted)" or "89 days, 4 hours (stable system)"
|
|
"""
|
|
try:
|
|
with open('/proc/uptime', 'r') as f:
|
|
uptime_seconds = float(f.readline().split()[0])
|
|
|
|
days = int(uptime_seconds // 86400)
|
|
hours = int((uptime_seconds % 86400) // 3600)
|
|
minutes = int((uptime_seconds % 3600) // 60)
|
|
|
|
# Build human-readable string
|
|
parts = []
|
|
if days > 0:
|
|
parts.append(f"{days} day{'s' if days != 1 else ''}")
|
|
if hours > 0:
|
|
parts.append(f"{hours} hour{'s' if hours != 1 else ''}")
|
|
if not parts: # Less than an hour
|
|
parts.append(f"{minutes} minute{'s' if minutes != 1 else ''}")
|
|
|
|
uptime_str = ", ".join(parts)
|
|
|
|
# Add context hint
|
|
if uptime_seconds < 600: # Less than 10 minutes
|
|
return f"{uptime_str} (just booted - likely startup issue)"
|
|
elif uptime_seconds < 3600: # Less than 1 hour
|
|
return f"{uptime_str} (recently booted)"
|
|
elif days >= 30:
|
|
return f"{uptime_str} (stable system)"
|
|
else:
|
|
return uptime_str
|
|
|
|
except Exception:
|
|
return "unknown"
|
|
|
|
|
|
def get_event_frequency(error_id: str = None, error_key: str = None,
|
|
category: str = None, hours: int = 24) -> Optional[Dict[str, Any]]:
|
|
"""Get frequency information for an error from the database.
|
|
|
|
Args:
|
|
error_id: Specific error ID to look up
|
|
error_key: Alternative error key
|
|
category: Error category
|
|
hours: Time window to check (default 24h)
|
|
|
|
Returns:
|
|
Dict with frequency info or None
|
|
"""
|
|
if not DB_PATH.exists():
|
|
return None
|
|
|
|
try:
|
|
conn = sqlite3.connect(str(DB_PATH), timeout=5)
|
|
cursor = conn.cursor()
|
|
|
|
# Try to find the error
|
|
if error_id:
|
|
cursor.execute('''
|
|
SELECT first_seen, last_seen, occurrences, category
|
|
FROM errors WHERE error_key = ? OR error_id = ?
|
|
ORDER BY last_seen DESC LIMIT 1
|
|
''', (error_id, error_id))
|
|
elif error_key:
|
|
cursor.execute('''
|
|
SELECT first_seen, last_seen, occurrences, category
|
|
FROM errors WHERE error_key = ?
|
|
ORDER BY last_seen DESC LIMIT 1
|
|
''', (error_key,))
|
|
elif category:
|
|
cursor.execute('''
|
|
SELECT first_seen, last_seen, occurrences, category
|
|
FROM errors WHERE category = ? AND resolved_at IS NULL
|
|
ORDER BY last_seen DESC LIMIT 1
|
|
''', (category,))
|
|
else:
|
|
conn.close()
|
|
return None
|
|
|
|
row = cursor.fetchone()
|
|
conn.close()
|
|
|
|
if not row:
|
|
return None
|
|
|
|
first_seen, last_seen, occurrences, cat = row
|
|
|
|
# Calculate age
|
|
try:
|
|
first_dt = datetime.fromisoformat(first_seen) if first_seen else None
|
|
last_dt = datetime.fromisoformat(last_seen) if last_seen else None
|
|
now = datetime.now()
|
|
|
|
result = {
|
|
'occurrences': occurrences or 1,
|
|
'category': cat
|
|
}
|
|
|
|
if first_dt:
|
|
age = now - first_dt
|
|
if age.total_seconds() < 3600:
|
|
result['first_seen_ago'] = f"{int(age.total_seconds() / 60)} minutes ago"
|
|
elif age.total_seconds() < 86400:
|
|
result['first_seen_ago'] = f"{int(age.total_seconds() / 3600)} hours ago"
|
|
else:
|
|
result['first_seen_ago'] = f"{age.days} days ago"
|
|
|
|
if last_dt and first_dt and occurrences and occurrences > 1:
|
|
# Calculate average interval
|
|
span = (last_dt - first_dt).total_seconds()
|
|
if span > 0 and occurrences > 1:
|
|
avg_interval = span / (occurrences - 1)
|
|
if avg_interval < 60:
|
|
result['pattern'] = f"recurring every ~{int(avg_interval)} seconds"
|
|
elif avg_interval < 3600:
|
|
result['pattern'] = f"recurring every ~{int(avg_interval / 60)} minutes"
|
|
else:
|
|
result['pattern'] = f"recurring every ~{int(avg_interval / 3600)} hours"
|
|
|
|
return result
|
|
|
|
except (ValueError, TypeError):
|
|
return {'occurrences': occurrences or 1, 'category': cat}
|
|
|
|
except Exception as e:
|
|
print(f"[AIContext] Error getting frequency: {e}")
|
|
return None
|
|
|
|
|
|
def get_smart_data(disk_device: str) -> Optional[str]:
|
|
"""Get SMART health data for a disk.
|
|
|
|
Args:
|
|
disk_device: Device path like /dev/sda or just sda
|
|
|
|
Returns:
|
|
Formatted SMART summary or None
|
|
"""
|
|
if not disk_device:
|
|
return None
|
|
|
|
# Normalize device path
|
|
if not disk_device.startswith('/dev/'):
|
|
disk_device = f'/dev/{disk_device}'
|
|
|
|
# Check device exists
|
|
if not os.path.exists(disk_device):
|
|
return None
|
|
|
|
try:
|
|
# Get health status
|
|
result = subprocess.run(
|
|
['smartctl', '-H', disk_device],
|
|
capture_output=True, text=True, timeout=10
|
|
)
|
|
|
|
health_status = "UNKNOWN"
|
|
if "PASSED" in result.stdout:
|
|
health_status = "PASSED"
|
|
elif "FAILED" in result.stdout:
|
|
health_status = "FAILED"
|
|
|
|
# Get key attributes
|
|
result = subprocess.run(
|
|
['smartctl', '-A', disk_device],
|
|
capture_output=True, text=True, timeout=10
|
|
)
|
|
|
|
attributes = {}
|
|
critical_attrs = [
|
|
'Reallocated_Sector_Ct', 'Current_Pending_Sector',
|
|
'Offline_Uncorrectable', 'UDMA_CRC_Error_Count',
|
|
'Reallocated_Event_Count', 'Reported_Uncorrect'
|
|
]
|
|
|
|
for line in result.stdout.split('\n'):
|
|
for attr in critical_attrs:
|
|
if attr in line:
|
|
parts = line.split()
|
|
# Typical format: ID ATTRIBUTE_NAME FLAGS VALUE WORST THRESH TYPE UPDATED RAW_VALUE
|
|
if len(parts) >= 10:
|
|
raw_value = parts[-1]
|
|
attributes[attr] = raw_value
|
|
|
|
# Build summary
|
|
lines = [f"SMART Health: {health_status}"]
|
|
|
|
# Add critical attributes if non-zero
|
|
for attr, value in attributes.items():
|
|
try:
|
|
if int(value) > 0:
|
|
lines.append(f" {attr}: {value}")
|
|
except ValueError:
|
|
pass
|
|
|
|
return "\n".join(lines) if len(lines) > 1 or health_status == "FAILED" else f"SMART Health: {health_status}"
|
|
|
|
except subprocess.TimeoutExpired:
|
|
return None
|
|
except FileNotFoundError:
|
|
# smartctl not installed
|
|
return None
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def extract_disk_device(text: str) -> Optional[str]:
|
|
"""Extract disk device name from error text.
|
|
|
|
Args:
|
|
text: Error message or log content
|
|
|
|
Returns:
|
|
Device name like 'sda' or None
|
|
"""
|
|
if not text:
|
|
return None
|
|
|
|
# Common patterns for disk devices in errors
|
|
patterns = [
|
|
r'/dev/(sd[a-z]\d*)',
|
|
r'/dev/(nvme\d+n\d+(?:p\d+)?)',
|
|
r'/dev/(hd[a-z]\d*)',
|
|
r'/dev/(vd[a-z]\d*)',
|
|
r'\b(sd[a-z])\b',
|
|
r'disk[_\s]+(sd[a-z])',
|
|
r'ata\d+\.\d+: (sd[a-z])',
|
|
]
|
|
|
|
for pattern in patterns:
|
|
match = re.search(pattern, text, re.IGNORECASE)
|
|
if match:
|
|
return match.group(1)
|
|
|
|
return None
|
|
|
|
|
|
def enrich_context_for_ai(
|
|
title: str,
|
|
body: str,
|
|
event_type: str,
|
|
data: Dict[str, Any],
|
|
journal_context: str = '',
|
|
detail_level: str = 'standard'
|
|
) -> str:
|
|
"""Build enriched context string for AI processing.
|
|
|
|
Combines:
|
|
- Original journal context
|
|
- Event frequency information
|
|
- System uptime
|
|
- SMART data (for disk errors)
|
|
- Known error matching
|
|
|
|
Args:
|
|
title: Notification title
|
|
body: Notification body
|
|
event_type: Type of event
|
|
data: Event data dict
|
|
journal_context: Original journal log context
|
|
detail_level: Level of detail (minimal, standard, detailed)
|
|
|
|
Returns:
|
|
Enriched context string
|
|
"""
|
|
context_parts = []
|
|
combined_text = f"{title} {body} {journal_context}"
|
|
|
|
# 1. System uptime - ONLY for critical system-level failures
|
|
# Uptime helps distinguish startup issues from runtime failures
|
|
# BUT it's noise for disk errors, warnings, or routine operations
|
|
# Only include for: system crash, kernel panic, OOM, cluster failures
|
|
uptime_critical_types = [
|
|
'crash', 'panic', 'oom', 'kernel',
|
|
'split_brain', 'quorum_lost', 'node_offline', 'node_fail',
|
|
'system_fail', 'boot_fail'
|
|
]
|
|
|
|
# Check if this is a critical system-level event (not disk/service/hardware)
|
|
event_lower = event_type.lower()
|
|
is_critical_system_event = any(t in event_lower for t in uptime_critical_types)
|
|
|
|
# Only add uptime for critical system failures, nothing else
|
|
if is_critical_system_event:
|
|
uptime = get_system_uptime()
|
|
if uptime and uptime != "unknown":
|
|
context_parts.append(f"System uptime: {uptime}")
|
|
|
|
# 2. Event frequency
|
|
error_key = data.get('error_key') or data.get('error_id')
|
|
category = data.get('category')
|
|
|
|
freq = get_event_frequency(error_id=error_key, category=category)
|
|
if freq:
|
|
freq_line = f"Event frequency: {freq.get('occurrences', 1)} occurrence(s)"
|
|
if freq.get('first_seen_ago'):
|
|
freq_line += f", first seen {freq['first_seen_ago']}"
|
|
if freq.get('pattern'):
|
|
freq_line += f", {freq['pattern']}"
|
|
context_parts.append(freq_line)
|
|
|
|
# 3. SMART data for disk-related events
|
|
disk_related = any(x in event_type.lower() for x in ['disk', 'smart', 'storage', 'io_error'])
|
|
if not disk_related:
|
|
disk_related = any(x in combined_text.lower() for x in ['disk', 'smart', '/dev/sd', 'ata', 'i/o error'])
|
|
|
|
if disk_related:
|
|
disk_device = extract_disk_device(combined_text)
|
|
if disk_device:
|
|
smart_data = get_smart_data(disk_device)
|
|
if smart_data:
|
|
context_parts.append(smart_data)
|
|
|
|
# 4. Known error matching
|
|
known_error_ctx = get_error_context(combined_text, category=category, detail_level=detail_level)
|
|
if known_error_ctx:
|
|
context_parts.append(known_error_ctx)
|
|
|
|
# 5. Add original journal context
|
|
if journal_context:
|
|
context_parts.append(f"Journal logs:\n{journal_context}")
|
|
|
|
# Combine all parts
|
|
if context_parts:
|
|
return "\n\n".join(context_parts)
|
|
|
|
return journal_context or ""
|
|
|
|
|
|
def get_enriched_context(
|
|
event: 'NotificationEvent',
|
|
detail_level: str = 'standard'
|
|
) -> str:
|
|
"""Convenience function to enrich context from a NotificationEvent.
|
|
|
|
Args:
|
|
event: NotificationEvent object
|
|
detail_level: Level of detail
|
|
|
|
Returns:
|
|
Enriched context string
|
|
"""
|
|
journal_context = event.data.get('_journal_context', '')
|
|
|
|
return enrich_context_for_ai(
|
|
title=event.data.get('title', ''),
|
|
body=event.data.get('body', event.data.get('message', '')),
|
|
event_type=event.event_type,
|
|
data=event.data,
|
|
journal_context=journal_context,
|
|
detail_level=detail_level
|
|
)
|