Files
ProxMenux/AppImage/scripts/ai_context_enrichment.py
2026-04-02 16:59:09 +02:00

391 lines
12 KiB
Python

#!/usr/bin/env python3
"""
AI Context Enrichment Module
Enriches notification context with additional information to help AI provide
more accurate and helpful responses:
1. Event frequency - how often this error has occurred
2. System uptime - helps distinguish startup issues from runtime failures
3. SMART disk data - for disk-related errors
4. Known error matching - from proxmox_known_errors database
Author: MacRimi
"""
import os
import re
import subprocess
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
import sqlite3
from pathlib import Path
# Import known errors database
try:
from proxmox_known_errors import get_error_context, find_matching_error
except ImportError:
def get_error_context(*args, **kwargs):
return None
def find_matching_error(*args, **kwargs):
return None
DB_PATH = Path('/usr/local/share/proxmenux/health_monitor.db')
def get_system_uptime() -> str:
"""Get system uptime in human-readable format.
Returns:
String like "2 minutes (recently booted)" or "89 days, 4 hours (stable system)"
"""
try:
with open('/proc/uptime', 'r') as f:
uptime_seconds = float(f.readline().split()[0])
days = int(uptime_seconds // 86400)
hours = int((uptime_seconds % 86400) // 3600)
minutes = int((uptime_seconds % 3600) // 60)
# Build human-readable string
parts = []
if days > 0:
parts.append(f"{days} day{'s' if days != 1 else ''}")
if hours > 0:
parts.append(f"{hours} hour{'s' if hours != 1 else ''}")
if not parts: # Less than an hour
parts.append(f"{minutes} minute{'s' if minutes != 1 else ''}")
uptime_str = ", ".join(parts)
# Add context hint
if uptime_seconds < 600: # Less than 10 minutes
return f"{uptime_str} (just booted - likely startup issue)"
elif uptime_seconds < 3600: # Less than 1 hour
return f"{uptime_str} (recently booted)"
elif days >= 30:
return f"{uptime_str} (stable system)"
else:
return uptime_str
except Exception:
return "unknown"
def get_event_frequency(error_id: str = None, error_key: str = None,
category: str = None, hours: int = 24) -> Optional[Dict[str, Any]]:
"""Get frequency information for an error from the database.
Args:
error_id: Specific error ID to look up
error_key: Alternative error key
category: Error category
hours: Time window to check (default 24h)
Returns:
Dict with frequency info or None
"""
if not DB_PATH.exists():
return None
try:
conn = sqlite3.connect(str(DB_PATH), timeout=5)
cursor = conn.cursor()
# Try to find the error
if error_id:
cursor.execute('''
SELECT first_seen, last_seen, occurrences, category
FROM errors WHERE error_key = ? OR error_id = ?
ORDER BY last_seen DESC LIMIT 1
''', (error_id, error_id))
elif error_key:
cursor.execute('''
SELECT first_seen, last_seen, occurrences, category
FROM errors WHERE error_key = ?
ORDER BY last_seen DESC LIMIT 1
''', (error_key,))
elif category:
cursor.execute('''
SELECT first_seen, last_seen, occurrences, category
FROM errors WHERE category = ? AND resolved_at IS NULL
ORDER BY last_seen DESC LIMIT 1
''', (category,))
else:
conn.close()
return None
row = cursor.fetchone()
conn.close()
if not row:
return None
first_seen, last_seen, occurrences, cat = row
# Calculate age
try:
first_dt = datetime.fromisoformat(first_seen) if first_seen else None
last_dt = datetime.fromisoformat(last_seen) if last_seen else None
now = datetime.now()
result = {
'occurrences': occurrences or 1,
'category': cat
}
if first_dt:
age = now - first_dt
if age.total_seconds() < 3600:
result['first_seen_ago'] = f"{int(age.total_seconds() / 60)} minutes ago"
elif age.total_seconds() < 86400:
result['first_seen_ago'] = f"{int(age.total_seconds() / 3600)} hours ago"
else:
result['first_seen_ago'] = f"{age.days} days ago"
if last_dt and first_dt and occurrences and occurrences > 1:
# Calculate average interval
span = (last_dt - first_dt).total_seconds()
if span > 0 and occurrences > 1:
avg_interval = span / (occurrences - 1)
if avg_interval < 60:
result['pattern'] = f"recurring every ~{int(avg_interval)} seconds"
elif avg_interval < 3600:
result['pattern'] = f"recurring every ~{int(avg_interval / 60)} minutes"
else:
result['pattern'] = f"recurring every ~{int(avg_interval / 3600)} hours"
return result
except (ValueError, TypeError):
return {'occurrences': occurrences or 1, 'category': cat}
except Exception as e:
print(f"[AIContext] Error getting frequency: {e}")
return None
def get_smart_data(disk_device: str) -> Optional[str]:
"""Get SMART health data for a disk.
Args:
disk_device: Device path like /dev/sda or just sda
Returns:
Formatted SMART summary or None
"""
if not disk_device:
return None
# Normalize device path
if not disk_device.startswith('/dev/'):
disk_device = f'/dev/{disk_device}'
# Check device exists
if not os.path.exists(disk_device):
return None
try:
# Get health status
result = subprocess.run(
['smartctl', '-H', disk_device],
capture_output=True, text=True, timeout=10
)
health_status = "UNKNOWN"
if "PASSED" in result.stdout:
health_status = "PASSED"
elif "FAILED" in result.stdout:
health_status = "FAILED"
# Get key attributes
result = subprocess.run(
['smartctl', '-A', disk_device],
capture_output=True, text=True, timeout=10
)
attributes = {}
critical_attrs = [
'Reallocated_Sector_Ct', 'Current_Pending_Sector',
'Offline_Uncorrectable', 'UDMA_CRC_Error_Count',
'Reallocated_Event_Count', 'Reported_Uncorrect'
]
for line in result.stdout.split('\n'):
for attr in critical_attrs:
if attr in line:
parts = line.split()
# Typical format: ID ATTRIBUTE_NAME FLAGS VALUE WORST THRESH TYPE UPDATED RAW_VALUE
if len(parts) >= 10:
raw_value = parts[-1]
attributes[attr] = raw_value
# Build summary
lines = [f"SMART Health: {health_status}"]
# Add critical attributes if non-zero
for attr, value in attributes.items():
try:
if int(value) > 0:
lines.append(f" {attr}: {value}")
except ValueError:
pass
return "\n".join(lines) if len(lines) > 1 or health_status == "FAILED" else f"SMART Health: {health_status}"
except subprocess.TimeoutExpired:
return None
except FileNotFoundError:
# smartctl not installed
return None
except Exception:
return None
def extract_disk_device(text: str) -> Optional[str]:
"""Extract disk device name from error text.
Args:
text: Error message or log content
Returns:
Device name like 'sda' or None
"""
if not text:
return None
# Common patterns for disk devices in errors
patterns = [
r'/dev/(sd[a-z]\d*)',
r'/dev/(nvme\d+n\d+(?:p\d+)?)',
r'/dev/(hd[a-z]\d*)',
r'/dev/(vd[a-z]\d*)',
r'\b(sd[a-z])\b',
r'disk[_\s]+(sd[a-z])',
r'ata\d+\.\d+: (sd[a-z])',
]
for pattern in patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
return match.group(1)
return None
def enrich_context_for_ai(
title: str,
body: str,
event_type: str,
data: Dict[str, Any],
journal_context: str = '',
detail_level: str = 'standard'
) -> str:
"""Build enriched context string for AI processing.
Combines:
- Original journal context
- Event frequency information
- System uptime
- SMART data (for disk errors)
- Known error matching
Args:
title: Notification title
body: Notification body
event_type: Type of event
data: Event data dict
journal_context: Original journal log context
detail_level: Level of detail (minimal, standard, detailed)
Returns:
Enriched context string
"""
context_parts = []
combined_text = f"{title} {body} {journal_context}"
# 1. System uptime - ONLY for critical system-level failures
# Uptime helps distinguish startup issues from runtime failures
# BUT it's noise for disk errors, warnings, or routine operations
# Only include for: system crash, kernel panic, OOM, cluster failures
uptime_critical_types = [
'crash', 'panic', 'oom', 'kernel',
'split_brain', 'quorum_lost', 'node_offline', 'node_fail',
'system_fail', 'boot_fail'
]
# Check if this is a critical system-level event (not disk/service/hardware)
event_lower = event_type.lower()
is_critical_system_event = any(t in event_lower for t in uptime_critical_types)
# Only add uptime for critical system failures, nothing else
if is_critical_system_event:
uptime = get_system_uptime()
if uptime and uptime != "unknown":
context_parts.append(f"System uptime: {uptime}")
# 2. Event frequency
error_key = data.get('error_key') or data.get('error_id')
category = data.get('category')
freq = get_event_frequency(error_id=error_key, category=category)
if freq:
freq_line = f"Event frequency: {freq.get('occurrences', 1)} occurrence(s)"
if freq.get('first_seen_ago'):
freq_line += f", first seen {freq['first_seen_ago']}"
if freq.get('pattern'):
freq_line += f", {freq['pattern']}"
context_parts.append(freq_line)
# 3. SMART data for disk-related events
disk_related = any(x in event_type.lower() for x in ['disk', 'smart', 'storage', 'io_error'])
if not disk_related:
disk_related = any(x in combined_text.lower() for x in ['disk', 'smart', '/dev/sd', 'ata', 'i/o error'])
if disk_related:
disk_device = extract_disk_device(combined_text)
if disk_device:
smart_data = get_smart_data(disk_device)
if smart_data:
context_parts.append(smart_data)
# 4. Known error matching
known_error_ctx = get_error_context(combined_text, category=category, detail_level=detail_level)
if known_error_ctx:
context_parts.append(known_error_ctx)
# 5. Add original journal context
if journal_context:
context_parts.append(f"Journal logs:\n{journal_context}")
# Combine all parts
if context_parts:
return "\n\n".join(context_parts)
return journal_context or ""
def get_enriched_context(
event: 'NotificationEvent',
detail_level: str = 'standard'
) -> str:
"""Convenience function to enrich context from a NotificationEvent.
Args:
event: NotificationEvent object
detail_level: Level of detail
Returns:
Enriched context string
"""
journal_context = event.data.get('_journal_context', '')
return enrich_context_for_ai(
title=event.data.get('title', ''),
body=event.data.get('body', event.data.get('message', '')),
event_type=event.event_type,
data=event.data,
journal_context=journal_context,
detail_level=detail_level
)