#!/usr/bin/env python3 """ AI Context Enrichment Module Enriches notification context with additional information to help AI provide more accurate and helpful responses: 1. Event frequency - how often this error has occurred 2. System uptime - helps distinguish startup issues from runtime failures 3. SMART disk data - for disk-related errors 4. Known error matching - from proxmox_known_errors database Author: MacRimi """ import os import re import subprocess from datetime import datetime, timedelta from typing import Optional, Dict, Any import sqlite3 from pathlib import Path # Import known errors database try: from proxmox_known_errors import get_error_context, find_matching_error except ImportError: def get_error_context(*args, **kwargs): return None def find_matching_error(*args, **kwargs): return None DB_PATH = Path('/usr/local/share/proxmenux/health_monitor.db') def get_system_uptime() -> str: """Get system uptime in human-readable format. Returns: String like "2 minutes (recently booted)" or "89 days, 4 hours (stable system)" """ try: with open('/proc/uptime', 'r') as f: uptime_seconds = float(f.readline().split()[0]) days = int(uptime_seconds // 86400) hours = int((uptime_seconds % 86400) // 3600) minutes = int((uptime_seconds % 3600) // 60) # Build human-readable string parts = [] if days > 0: parts.append(f"{days} day{'s' if days != 1 else ''}") if hours > 0: parts.append(f"{hours} hour{'s' if hours != 1 else ''}") if not parts: # Less than an hour parts.append(f"{minutes} minute{'s' if minutes != 1 else ''}") uptime_str = ", ".join(parts) # Add context hint if uptime_seconds < 600: # Less than 10 minutes return f"{uptime_str} (just booted - likely startup issue)" elif uptime_seconds < 3600: # Less than 1 hour return f"{uptime_str} (recently booted)" elif days >= 30: return f"{uptime_str} (stable system)" else: return uptime_str except Exception: return "unknown" def get_event_frequency(error_id: str = None, error_key: str = None, category: str = None, hours: int = 24) -> Optional[Dict[str, Any]]: """Get frequency information for an error from the database. Args: error_id: Specific error ID to look up error_key: Alternative error key category: Error category hours: Time window to check (default 24h) Returns: Dict with frequency info or None """ if not DB_PATH.exists(): return None try: conn = sqlite3.connect(str(DB_PATH), timeout=5) cursor = conn.cursor() # Try to find the error if error_id: cursor.execute(''' SELECT first_seen, last_seen, occurrences, category FROM errors WHERE error_key = ? OR error_id = ? ORDER BY last_seen DESC LIMIT 1 ''', (error_id, error_id)) elif error_key: cursor.execute(''' SELECT first_seen, last_seen, occurrences, category FROM errors WHERE error_key = ? ORDER BY last_seen DESC LIMIT 1 ''', (error_key,)) elif category: cursor.execute(''' SELECT first_seen, last_seen, occurrences, category FROM errors WHERE category = ? AND resolved_at IS NULL ORDER BY last_seen DESC LIMIT 1 ''', (category,)) else: conn.close() return None row = cursor.fetchone() conn.close() if not row: return None first_seen, last_seen, occurrences, cat = row # Calculate age try: first_dt = datetime.fromisoformat(first_seen) if first_seen else None last_dt = datetime.fromisoformat(last_seen) if last_seen else None now = datetime.now() result = { 'occurrences': occurrences or 1, 'category': cat } if first_dt: age = now - first_dt if age.total_seconds() < 3600: result['first_seen_ago'] = f"{int(age.total_seconds() / 60)} minutes ago" elif age.total_seconds() < 86400: result['first_seen_ago'] = f"{int(age.total_seconds() / 3600)} hours ago" else: result['first_seen_ago'] = f"{age.days} days ago" if last_dt and first_dt and occurrences and occurrences > 1: # Calculate average interval span = (last_dt - first_dt).total_seconds() if span > 0 and occurrences > 1: avg_interval = span / (occurrences - 1) if avg_interval < 60: result['pattern'] = f"recurring every ~{int(avg_interval)} seconds" elif avg_interval < 3600: result['pattern'] = f"recurring every ~{int(avg_interval / 60)} minutes" else: result['pattern'] = f"recurring every ~{int(avg_interval / 3600)} hours" return result except (ValueError, TypeError): return {'occurrences': occurrences or 1, 'category': cat} except Exception as e: print(f"[AIContext] Error getting frequency: {e}") return None def get_smart_data(disk_device: str) -> Optional[str]: """Get SMART health data for a disk. Args: disk_device: Device path like /dev/sda or just sda Returns: Formatted SMART summary or None """ if not disk_device: return None # Normalize device path if not disk_device.startswith('/dev/'): disk_device = f'/dev/{disk_device}' # Check device exists if not os.path.exists(disk_device): return None try: # Get health status result = subprocess.run( ['smartctl', '-H', disk_device], capture_output=True, text=True, timeout=10 ) health_status = "UNKNOWN" if "PASSED" in result.stdout: health_status = "PASSED" elif "FAILED" in result.stdout: health_status = "FAILED" # Get key attributes result = subprocess.run( ['smartctl', '-A', disk_device], capture_output=True, text=True, timeout=10 ) attributes = {} critical_attrs = [ 'Reallocated_Sector_Ct', 'Current_Pending_Sector', 'Offline_Uncorrectable', 'UDMA_CRC_Error_Count', 'Reallocated_Event_Count', 'Reported_Uncorrect' ] for line in result.stdout.split('\n'): for attr in critical_attrs: if attr in line: parts = line.split() # Typical format: ID ATTRIBUTE_NAME FLAGS VALUE WORST THRESH TYPE UPDATED RAW_VALUE if len(parts) >= 10: raw_value = parts[-1] attributes[attr] = raw_value # Build summary lines = [f"SMART Health: {health_status}"] # Add critical attributes if non-zero for attr, value in attributes.items(): try: if int(value) > 0: lines.append(f" {attr}: {value}") except ValueError: pass return "\n".join(lines) if len(lines) > 1 or health_status == "FAILED" else f"SMART Health: {health_status}" except subprocess.TimeoutExpired: return None except FileNotFoundError: # smartctl not installed return None except Exception: return None def extract_disk_device(text: str) -> Optional[str]: """Extract disk device name from error text. Args: text: Error message or log content Returns: Device name like 'sda' or None """ if not text: return None # Common patterns for disk devices in errors patterns = [ r'/dev/(sd[a-z]\d*)', r'/dev/(nvme\d+n\d+(?:p\d+)?)', r'/dev/(hd[a-z]\d*)', r'/dev/(vd[a-z]\d*)', r'\b(sd[a-z])\b', r'disk[_\s]+(sd[a-z])', r'ata\d+\.\d+: (sd[a-z])', ] for pattern in patterns: match = re.search(pattern, text, re.IGNORECASE) if match: return match.group(1) return None def enrich_context_for_ai( title: str, body: str, event_type: str, data: Dict[str, Any], journal_context: str = '', detail_level: str = 'standard' ) -> str: """Build enriched context string for AI processing. Combines: - Original journal context - Event frequency information - System uptime - SMART data (for disk errors) - Known error matching Args: title: Notification title body: Notification body event_type: Type of event data: Event data dict journal_context: Original journal log context detail_level: Level of detail (minimal, standard, detailed) Returns: Enriched context string """ context_parts = [] combined_text = f"{title} {body} {journal_context}" # 1. System uptime - ONLY for critical system-level failures # Uptime helps distinguish startup issues from runtime failures # BUT it's noise for disk errors, warnings, or routine operations # Only include for: system crash, kernel panic, OOM, cluster failures uptime_critical_types = [ 'crash', 'panic', 'oom', 'kernel', 'split_brain', 'quorum_lost', 'node_offline', 'node_fail', 'system_fail', 'boot_fail' ] # Check if this is a critical system-level event (not disk/service/hardware) event_lower = event_type.lower() is_critical_system_event = any(t in event_lower for t in uptime_critical_types) # Only add uptime for critical system failures, nothing else if is_critical_system_event: uptime = get_system_uptime() if uptime and uptime != "unknown": context_parts.append(f"System uptime: {uptime}") # 2. Event frequency error_key = data.get('error_key') or data.get('error_id') category = data.get('category') freq = get_event_frequency(error_id=error_key, category=category) if freq: freq_line = f"Event frequency: {freq.get('occurrences', 1)} occurrence(s)" if freq.get('first_seen_ago'): freq_line += f", first seen {freq['first_seen_ago']}" if freq.get('pattern'): freq_line += f", {freq['pattern']}" context_parts.append(freq_line) # 3. SMART data for disk-related events disk_related = any(x in event_type.lower() for x in ['disk', 'smart', 'storage', 'io_error']) if not disk_related: disk_related = any(x in combined_text.lower() for x in ['disk', 'smart', '/dev/sd', 'ata', 'i/o error']) if disk_related: disk_device = extract_disk_device(combined_text) if disk_device: smart_data = get_smart_data(disk_device) if smart_data: context_parts.append(smart_data) # 4. Known error matching known_error_ctx = get_error_context(combined_text, category=category, detail_level=detail_level) if known_error_ctx: context_parts.append(known_error_ctx) # 5. Add original journal context if journal_context: context_parts.append(f"Journal logs:\n{journal_context}") # Combine all parts if context_parts: return "\n\n".join(context_parts) return journal_context or "" def get_enriched_context( event: 'NotificationEvent', detail_level: str = 'standard' ) -> str: """Convenience function to enrich context from a NotificationEvent. Args: event: NotificationEvent object detail_level: Level of detail Returns: Enriched context string """ journal_context = event.data.get('_journal_context', '') return enrich_context_for_ai( title=event.data.get('title', ''), body=event.data.get('body', event.data.get('message', '')), event_type=event.event_type, data=event.data, journal_context=journal_context, detail_level=detail_level )