""" ProxMenux Health Monitor Module Provides comprehensive, lightweight health checks for Proxmox systems. Optimized for minimal system impact with intelligent thresholds and hysteresis. Author: MacRimi Version: 1.2 (Always returns all 10 categories) """ import psutil import subprocess import json import time import os import hashlib # Added for MD5 hashing from typing import Dict, List, Any, Tuple, Optional from datetime import datetime, timedelta from collections import defaultdict import re from health_persistence import health_persistence try: from proxmox_storage_monitor import proxmox_storage_monitor PROXMOX_STORAGE_AVAILABLE = True except ImportError: PROXMOX_STORAGE_AVAILABLE = False class HealthMonitor: """ Monitors system health across multiple components with minimal impact. Implements hysteresis, intelligent caching, progressive escalation, and persistent error tracking. Always returns all 10 health categories. """ # CPU Thresholds CPU_WARNING = 85 CPU_CRITICAL = 95 CPU_RECOVERY = 75 CPU_WARNING_DURATION = 300 # 5 minutes sustained CPU_CRITICAL_DURATION = 300 # 5 minutes sustained CPU_RECOVERY_DURATION = 120 # Memory Thresholds MEMORY_WARNING = 85 MEMORY_CRITICAL = 95 MEMORY_DURATION = 60 SWAP_WARNING_DURATION = 300 SWAP_CRITICAL_PERCENT = 5 SWAP_CRITICAL_DURATION = 120 # Storage Thresholds STORAGE_WARNING = 85 STORAGE_CRITICAL = 95 # Temperature Thresholds TEMP_WARNING = 80 TEMP_CRITICAL = 90 # Network Thresholds NETWORK_LATENCY_WARNING = 100 NETWORK_LATENCY_CRITICAL = 300 NETWORK_TIMEOUT = 2 NETWORK_INACTIVE_DURATION = 600 # Log Thresholds LOG_ERRORS_WARNING = 5 LOG_ERRORS_CRITICAL = 10 LOG_WARNINGS_WARNING = 15 LOG_WARNINGS_CRITICAL = 30 LOG_CHECK_INTERVAL = 300 # Updates Thresholds UPDATES_WARNING = 365 # Only warn after 1 year without updates (system_age) UPDATES_CRITICAL = 548 # Critical after 18 months without updates SECURITY_WARN_DAYS = 360 # Security updates only become WARNING after 360 days unpatched BENIGN_ERROR_PATTERNS = [ # ── Proxmox API / proxy operational noise ── r'got inotify poll request in wrong process', r'auth key pair too old, rotating', r'proxy detected vanished client connection', r'worker \d+ finished', r'connection timed out', r'disconnect peer', r'task OK', r'backup finished', # PVE ticket / auth transient errors (web UI session expiry, API token # refresh, brute-force bots). These are logged at WARNING/ERR level # but are NOT system problems -- they are access-control events. r'invalid PVE ticket', r'authentication failure.*pve', r'permission denied.*ticket', r'no ticket', r'CSRF.*failed', r'pveproxy\[\d+\]: authentication failure', r'pvedaemon\[\d+\]: authentication failure', # PVE cluster/corosync normal chatter r'corosync.*retransmit', r'corosync.*delivering', r'pmxcfs.*update', r'pve-cluster\[\d+\]:.*status', # ── Systemd informational messages ── r'(started|starting|stopped|stopping) session', r'session \d+ logged (in|out)', r'new session \d+ of user', r'removed session \d+', r'user@\d+\.service:', r'user runtime directory', # Systemd service restarts (normal lifecycle) r'systemd\[\d+\]: .+\.service: (Scheduled restart|Consumed)', r'systemd\[\d+\]: .+\.service: Deactivated successfully', # ── Network transient errors (common and usually self-recovering) ── r'dhcp.*timeout', r'temporary failure in name resolution', r'network is unreachable', r'no route to host', # ── Backup and sync normal warnings ── r'rsync.*vanished', r'backup job .* finished', r'vzdump backup .* finished', # ── ZFS informational ── r'zfs.*scrub (started|finished|in progress)', r'zpool.*resilver', # ── LXC/Container normal operations ── r'lxc.*monitor', r'systemd\[1\]: (started|stopped) .*\.scope', # ── ATA/SCSI transient bus errors ── # These are logged at ERR level but are common on SATA controllers # during hot-plug, link renegotiation, or cable noise. They are NOT # indicative of disk failure unless SMART also reports problems. r'ata\d+.*SError.*BadCRC', r'ata\d+.*Emask 0x10.*ATA bus error', r'failed command: (READ|WRITE) FPDMA QUEUED', r'ata\d+.*hard resetting link', r'ata\d+.*link is slow', r'ata\d+.*COMRESET', # ── ProxMenux self-referential noise ── # The monitor reporting its OWN service failures is circular -- # it cannot meaningfully alert about itself. r'proxmenux-monitor\.service.*Failed', r'proxmenux-monitor\.service.*exit-code', r'ProxMenux-Monitor.*Failed at step EXEC', # ── PVE scheduler operational noise ── # pvescheduler emits "could not update job state" every minute # when a scheduled job reference is stale. This is cosmetic, # not a system problem. r'pvescheduler.*could not update job state', r'pvescheduler.*no such task', ] CRITICAL_LOG_KEYWORDS = [ 'out of memory', 'oom_kill', 'kernel panic', 'filesystem read-only', 'cannot mount', 'raid.*failed', 'md.*device failed', 'ext4-fs error', 'xfs.*corruption', 'lvm activation failed', 'hardware error', 'mce:', 'general protection fault', ] # Segfault is WARNING, not CRITICAL -- only PVE-critical process # segfaults are escalated to CRITICAL in _classify_log_severity. PVE_CRITICAL_PROCESSES = { 'pveproxy', 'pvedaemon', 'pvestatd', 'pve-cluster', 'corosync', 'qemu-system', 'lxc-start', 'ceph-osd', 'ceph-mon', 'pmxcfs', 'kvm', } WARNING_LOG_KEYWORDS = [ 'i/o error', 'ata error', 'scsi error', 'task hung', 'blocked for more than', 'failed to start', 'service.*failed', 'disk.*offline', 'disk.*removed', 'segfault', # WARNING by default; escalated to CRITICAL only for PVE processes ] # PVE Critical Services PVE_SERVICES = ['pveproxy', 'pvedaemon', 'pvestatd', 'pve-cluster'] def __init__(self): """Initialize health monitor with state tracking""" self.state_history = defaultdict(list) self.last_check_times = {} self.cached_results = {} self.network_baseline = {} self.io_error_history = defaultdict(list) self.failed_vm_history = set() # Track VMs that failed to start self.persistent_log_patterns = defaultdict(lambda: {'count': 0, 'first_seen': 0, 'last_seen': 0}) self._unknown_counts = {} # Track consecutive UNKNOWN cycles per category # System capabilities - derived from Proxmox storage types at runtime (Priority 1.5) # SMART detection still uses filesystem check on init (lightweight) has_smart = os.path.exists('/usr/sbin/smartctl') or os.path.exists('/usr/bin/smartctl') self.capabilities = {'has_zfs': False, 'has_lvm': False, 'has_smart': has_smart} try: health_persistence.cleanup_old_errors() except Exception as e: print(f"[HealthMonitor] Cleanup warning: {e}") # ─── Lightweight sampling methods for the dedicated vital-signs thread ─── # These ONLY append data to state_history without triggering evaluation, # persistence, or subprocess-heavy operations. def _sample_cpu_usage(self): """Lightweight CPU sample: read usage % and append to history. ~30ms cost.""" try: cpu_percent = psutil.cpu_percent(interval=0) current_time = time.time() state_key = 'cpu_usage' self.state_history[state_key].append({ 'value': cpu_percent, 'time': current_time }) # Prune entries older than 6 minutes self.state_history[state_key] = [ e for e in self.state_history[state_key] if current_time - e['time'] < 360 ] except Exception: pass # Sampling must never crash the thread def _sample_cpu_temperature(self): """Lightweight temperature sample: read sensor and append to history. ~50ms cost.""" try: result = subprocess.run( ['sensors', '-A', '-u'], capture_output=True, text=True, timeout=2 ) if result.returncode != 0: return temps = [] for line in result.stdout.split('\n'): if 'temp' in line.lower() and '_input' in line: try: temp = float(line.split(':')[1].strip()) temps.append(temp) except Exception: continue if temps: max_temp = max(temps) current_time = time.time() state_key = 'cpu_temp_history' self.state_history[state_key].append({ 'value': max_temp, 'time': current_time }) # Prune entries older than 4 minutes self.state_history[state_key] = [ e for e in self.state_history[state_key] if current_time - e['time'] < 240 ] except Exception: pass # Sampling must never crash the thread def get_system_info(self) -> Dict[str, Any]: """ Get lightweight system info for header display. Returns: hostname, uptime, and cached health status. This is extremely lightweight and uses cached health status. """ try: # Get hostname hostname = os.uname().nodename # Get uptime (very cheap operation) uptime_seconds = time.time() - psutil.boot_time() # Get cached health status (no expensive checks) health_status = self.get_cached_health_status() return { 'hostname': hostname, 'uptime_seconds': int(uptime_seconds), 'uptime': self._format_uptime(uptime_seconds), 'health': health_status, 'timestamp': datetime.now().isoformat() } except Exception as e: return { 'hostname': 'unknown', 'uptime_seconds': 0, 'uptime': 'Unknown', 'health': {'status': 'UNKNOWN', 'summary': f'Error: {str(e)}'}, 'timestamp': datetime.now().isoformat() } def _format_uptime(self, seconds: float) -> str: """Format uptime in human-readable format""" days = int(seconds // 86400) hours = int((seconds % 86400) // 3600) minutes = int((seconds % 3600) // 60) if days > 0: return f"{days}d {hours}h {minutes}m" elif hours > 0: return f"{hours}h {minutes}m" else: return f"{minutes}m" def get_cached_health_status(self) -> Dict[str, str]: """ Get cached health status without running expensive checks. The background health collector keeps '_bg_overall' always fresh (every 5 min). Falls back to calculating on demand if background data is stale or unavailable. """ current_time = time.time() # 1. Check background collector cache (updated every 5 min by _health_collector_loop) bg_key = '_bg_overall' if bg_key in self.last_check_times: age = current_time - self.last_check_times[bg_key] if age < 360: # 6 min (5 min interval + 1 min tolerance) return self.cached_results.get(bg_key, {'status': 'OK', 'summary': 'System operational'}) # 2. Check regular cache (updated by modal fetches or on-demand) cache_key = 'overall_health' if cache_key in self.last_check_times: if current_time - self.last_check_times[cache_key] < 60: return self.cached_results.get(cache_key, {'status': 'OK', 'summary': 'System operational'}) # 3. No fresh cache - calculate on demand (happens only on first load before bg thread runs) status = self.get_overall_status() self.cached_results[cache_key] = { 'status': status['status'], 'summary': status['summary'] } self.last_check_times[cache_key] = current_time return self.cached_results[cache_key] def get_overall_status(self) -> Dict[str, Any]: """Get overall health status summary with minimal overhead""" details = self.get_detailed_status() overall_status = details.get('overall', 'OK') summary = details.get('summary', '') # Count statuses critical_count = 0 warning_count = 0 ok_count = 0 for category, data in details.get('details', {}).items(): if isinstance(data, dict): status = data.get('status', 'OK') if status == 'CRITICAL': critical_count += 1 elif status == 'WARNING': warning_count += 1 elif status == 'OK': ok_count += 1 return { 'status': overall_status, 'summary': summary, 'critical_count': critical_count, 'warning_count': warning_count, 'ok_count': ok_count, 'timestamp': datetime.now().isoformat() } def get_detailed_status(self) -> Dict[str, Any]: """ Get comprehensive health status with all checks. Returns JSON structure with ALL 10 categories always present. Now includes persistent error tracking. """ # Run cleanup on every status check so stale errors are auto-resolved # using the user-configured Suppression Duration (single source of truth). try: health_persistence.cleanup_old_errors() except Exception: pass active_errors = health_persistence.get_active_errors() # No need to create persistent_issues dict here, it's implicitly handled by the checks details = { 'cpu': {'status': 'OK'}, 'memory': {'status': 'OK'}, 'storage': {'status': 'OK'}, # This will be overwritten by specific storage checks 'disks': {'status': 'OK'}, # This will be overwritten by disk/filesystem checks 'network': {'status': 'OK'}, 'vms': {'status': 'OK'}, 'services': {'status': 'OK'}, 'logs': {'status': 'OK'}, 'updates': {'status': 'OK'}, 'security': {'status': 'OK'} } critical_issues = [] warning_issues = [] info_issues = [] # Added info_issues to track INFO separately # --- Priority Order of Checks --- # Priority 1: Critical PVE Services services_status = self._check_pve_services() details['services'] = services_status if services_status['status'] == 'CRITICAL': critical_issues.append(f"PVE Services: {services_status.get('reason', 'Service failure')}") elif services_status['status'] == 'WARNING': warning_issues.append(f"PVE Services: {services_status.get('reason', 'Service issue')}") # Priority 1.5: Proxmox Storage Check (External Module) proxmox_storage_result = self._check_proxmox_storage() if proxmox_storage_result: # Only process if the check ran (module available) details['storage'] = proxmox_storage_result if proxmox_storage_result.get('status') == 'CRITICAL': critical_issues.append(proxmox_storage_result.get('reason', 'Proxmox storage unavailable')) elif proxmox_storage_result.get('status') == 'WARNING': warning_issues.append(proxmox_storage_result.get('reason', 'Proxmox storage issue')) # Derive capabilities from Proxmox storage types (immediate, no extra checks) storage_checks = proxmox_storage_result.get('checks', {}) storage_types = {v.get('detail', '').split(' ')[0].lower() for v in storage_checks.values() if isinstance(v, dict)} self.capabilities['has_zfs'] = any(t in ('zfspool', 'zfs') for t in storage_types) self.capabilities['has_lvm'] = any(t in ('lvm', 'lvmthin') for t in storage_types) # Priority 2: Disk/Filesystem Health (Internal checks: usage, ZFS, SMART, IO errors) storage_status = self._check_storage_optimized() details['disks'] = storage_status # Use 'disks' for filesystem/disk specific issues if storage_status.get('status') == 'CRITICAL': critical_issues.append(f"Storage/Disks: {storage_status.get('reason', 'Disk/Storage failure')}") elif storage_status.get('status') == 'WARNING': warning_issues.append(f"Storage/Disks: {storage_status.get('reason', 'Disk/Storage issue')}") # Priority 3: VMs/CTs Status (with persistence) vms_status = self._check_vms_cts_with_persistence() details['vms'] = vms_status if vms_status.get('status') == 'CRITICAL': critical_issues.append(f"VMs/CTs: {vms_status.get('reason', 'VM/CT failure')}") elif vms_status.get('status') == 'WARNING': warning_issues.append(f"VMs/CTs: {vms_status.get('reason', 'VM/CT issue')}") # Priority 4: Network Connectivity network_status = self._check_network_optimized() details['network'] = network_status if network_status.get('status') == 'CRITICAL': critical_issues.append(f"Network: {network_status.get('reason', 'Network failure')}") elif network_status.get('status') == 'WARNING': warning_issues.append(f"Network: {network_status.get('reason', 'Network issue')}") # Priority 5: CPU Usage (with hysteresis) cpu_status = self._check_cpu_with_hysteresis() details['cpu'] = cpu_status if cpu_status.get('status') == 'CRITICAL': critical_issues.append(f"CPU: {cpu_status.get('reason', 'CPU critical')}") elif cpu_status.get('status') == 'WARNING': warning_issues.append(f"CPU: {cpu_status.get('reason', 'CPU high')}") # Priority 6: Memory Usage (RAM and Swap) memory_status = self._check_memory_comprehensive() details['memory'] = memory_status if memory_status.get('status') == 'CRITICAL': critical_issues.append(f"Memory: {memory_status.get('reason', 'Memory critical')}") elif memory_status.get('status') == 'WARNING': warning_issues.append(f"Memory: {memory_status.get('reason', 'Memory high')}") # Priority 7: Log Analysis (with persistence) logs_status = self._check_logs_with_persistence() details['logs'] = logs_status if logs_status.get('status') == 'CRITICAL': critical_issues.append(f"Logs: {logs_status.get('reason', 'Critical log errors')}") elif logs_status.get('status') == 'WARNING': warning_issues.append(f"Logs: {logs_status.get('reason', 'Log warnings')}") # Priority 8: System Updates updates_status = self._check_updates() details['updates'] = updates_status if updates_status.get('status') == 'CRITICAL': critical_issues.append(f"Updates: {updates_status.get('reason', 'System not updated')}") elif updates_status.get('status') == 'WARNING': warning_issues.append(f"Updates: {updates_status.get('reason', 'Updates pending')}") elif updates_status.get('status') == 'INFO': info_issues.append(f"Updates: {updates_status.get('reason', 'Informational update notice')}") # Priority 9: Security Checks security_status = self._check_security() details['security'] = security_status if security_status.get('status') == 'WARNING': warning_issues.append(f"Security: {security_status.get('reason', 'Security issue')}") elif security_status.get('status') == 'INFO': info_issues.append(f"Security: {security_status.get('reason', 'Security information')}") # --- Track UNKNOWN counts and persist if >= 3 consecutive cycles --- unknown_issues = [] for cat_key, cat_data in details.items(): cat_status = cat_data.get('status', 'OK') if cat_status == 'UNKNOWN': count = self._unknown_counts.get(cat_key, 0) + 1 self._unknown_counts[cat_key] = min(count, 10) # Cap to avoid unbounded growth unknown_issues.append(f"{cat_key}: {cat_data.get('reason', 'Check unavailable')}") if count == 3: # Only persist on the exact 3rd cycle, not every cycle after try: health_persistence.record_unknown_persistent( cat_key, cat_data.get('reason', 'Check unavailable')) except Exception: pass else: self._unknown_counts[cat_key] = 0 # --- Determine Overall Status --- # Severity: CRITICAL > WARNING > UNKNOWN (capped at WARNING) > INFO > OK if critical_issues: overall = 'CRITICAL' summary = '; '.join(critical_issues[:3]) elif warning_issues: overall = 'WARNING' summary = '; '.join(warning_issues[:3]) elif unknown_issues: overall = 'WARNING' # UNKNOWN caps at WARNING, never escalates to CRITICAL summary = '; '.join(unknown_issues[:3]) elif info_issues: overall = 'OK' # INFO statuses don't degrade overall health summary = '; '.join(info_issues[:3]) else: overall = 'OK' summary = 'All systems operational' # --- Emit events for state changes (Bloque A: Notification prep) --- try: previous_overall = getattr(self, '_last_overall_status', None) if previous_overall and previous_overall != overall: # Overall status changed - emit event health_persistence.emit_event( event_type='state_change', category='overall', severity=overall, data={ 'previous': previous_overall, 'current': overall, 'summary': summary } ) # Track per-category state changes previous_details = getattr(self, '_last_category_statuses', {}) for cat_key, cat_data in details.items(): cat_status = cat_data.get('status', 'OK') prev_status = previous_details.get(cat_key, 'OK') if prev_status != cat_status and cat_status in ('WARNING', 'CRITICAL'): health_persistence.emit_event( event_type='state_change', category=cat_key, severity=cat_status, data={ 'previous': prev_status, 'current': cat_status, 'reason': cat_data.get('reason', '') } ) self._last_overall_status = overall self._last_category_statuses = {k: v.get('status', 'OK') for k, v in details.items()} except Exception: pass # Event emission should never break health checks return { 'overall': overall, 'summary': summary, 'details': details, 'timestamp': datetime.now().isoformat() } def _check_cpu_with_hysteresis(self) -> Dict[str, Any]: """Check CPU with hysteresis to avoid flapping alerts - requires 5min sustained high usage""" try: cpu_percent = psutil.cpu_percent(interval=1) current_time = time.time() state_key = 'cpu_usage' # Add this reading as well (supplements the sampler thread) self.state_history[state_key].append({ 'value': cpu_percent, 'time': current_time }) # Snapshot the list for thread-safe reading (sampler may append concurrently) cpu_snapshot = list(self.state_history[state_key]) # Prune old entries via snapshot replacement (atomic assignment) self.state_history[state_key] = [ entry for entry in cpu_snapshot if current_time - entry['time'] < 360 ] critical_samples = [ entry for entry in self.state_history[state_key] if entry['value'] >= self.CPU_CRITICAL and current_time - entry['time'] <= self.CPU_CRITICAL_DURATION ] warning_samples = [ entry for entry in self.state_history[state_key] if entry['value'] >= self.CPU_WARNING and current_time - entry['time'] <= self.CPU_WARNING_DURATION ] recovery_samples = [ entry for entry in self.state_history[state_key] if entry['value'] < self.CPU_RECOVERY and current_time - entry['time'] <= self.CPU_RECOVERY_DURATION ] if len(critical_samples) >= 3: status = 'CRITICAL' reason = f'CPU >{self.CPU_CRITICAL}% sustained for {self.CPU_CRITICAL_DURATION}s' elif len(warning_samples) >= 3 and len(recovery_samples) < 2: status = 'WARNING' reason = f'CPU >{self.CPU_WARNING}% sustained for {self.CPU_WARNING_DURATION}s' else: status = 'OK' reason = None temp_status = self._check_cpu_temperature() result = { 'status': status, 'usage': round(cpu_percent, 1), 'cores': psutil.cpu_count() } if reason: result['reason'] = reason if temp_status and temp_status.get('status') != 'UNKNOWN': result['temperature'] = temp_status if temp_status.get('status') == 'CRITICAL': result['status'] = 'CRITICAL' result['reason'] = temp_status.get('reason') elif temp_status.get('status') == 'WARNING' and status == 'OK': result['status'] = 'WARNING' result['reason'] = temp_status.get('reason') # Build checks dict for frontend expandable section checks = { 'cpu_usage': { 'status': status, 'detail': 'Sustained high CPU usage' if status != 'OK' else 'Normal' } } if temp_status and temp_status.get('status') != 'UNKNOWN': t_status = temp_status.get('status', 'OK') checks['cpu_temperature'] = { 'status': t_status, 'detail': 'Temperature elevated' if t_status != 'OK' else 'Normal' } else: checks['cpu_temperature'] = { 'status': 'INFO', 'detail': 'No temperature sensor detected - install lm-sensors if hardware supports it', } result['checks'] = checks return result except Exception as e: return {'status': 'UNKNOWN', 'reason': f'CPU check failed: {str(e)}'} def _check_cpu_temperature(self) -> Optional[Dict[str, Any]]: """ Check CPU temperature with temporal logic: - WARNING if temp >80°C sustained for >3 minutes - Auto-clears if temp ≤80°C for 30 seconds - No dismiss button (non-dismissable) """ cache_key = 'cpu_temp' current_time = time.time() # Check every 10 seconds instead of 60 if cache_key in self.last_check_times: if current_time - self.last_check_times[cache_key] < 10: return self.cached_results.get(cache_key) try: result = subprocess.run( ['sensors', '-A', '-u'], capture_output=True, text=True, timeout=2 ) if result.returncode == 0: temps = [] for line in result.stdout.split('\n'): if 'temp' in line.lower() and '_input' in line: try: temp = float(line.split(':')[1].strip()) temps.append(temp) except: continue if temps: max_temp = max(temps) state_key = 'cpu_temp_history' # Add this reading (supplements the sampler thread) self.state_history[state_key].append({ 'value': max_temp, 'time': current_time }) # Snapshot for thread-safe reading, then atomic prune temp_snapshot = list(self.state_history[state_key]) self.state_history[state_key] = [ entry for entry in temp_snapshot if current_time - entry['time'] < 240 ] # Check if temperature >80°C for more than 3 minutes (180 seconds) high_temp_samples = [ entry for entry in self.state_history[state_key] if entry['value'] > 80 and current_time - entry['time'] <= 180 ] # Check if temperature ≤80°C for last 30 seconds (recovery) recovery_samples = [ entry for entry in self.state_history[state_key] if entry['value'] <= 80 and current_time - entry['time'] <= 30 ] # Require at least 18 samples over 3 minutes (one every 10 seconds) to trigger alert if len(high_temp_samples) >= 18: # Temperature has been >80°C for >3 minutes status = 'WARNING' reason = f'CPU temperature {max_temp}°C >80°C sustained >3min' # Record non-dismissable error health_persistence.record_error( error_key='cpu_temperature', category='temperature', severity='WARNING', reason=reason, details={'temperature': max_temp, 'dismissable': False} ) elif len(recovery_samples) >= 3: # Temperature has been ≤80°C for 30 seconds - clear the error status = 'OK' reason = None health_persistence.resolve_error('cpu_temperature', 'Temperature recovered') else: # Temperature is elevated but not long enough, or recovering but not yet cleared # Check if we already have an active error if health_persistence.is_error_active('cpu_temperature', category='temperature'): # Keep the warning active status = 'WARNING' reason = f'CPU temperature {max_temp}°C still elevated' else: # No active warning yet status = 'OK' reason = None temp_result = { 'status': status, 'value': round(max_temp, 1), 'unit': '°C' } if reason: temp_result['reason'] = reason self.cached_results[cache_key] = temp_result self.last_check_times[cache_key] = current_time return temp_result return None except Exception: return None def _check_memory_comprehensive(self) -> Dict[str, Any]: """ Check memory including RAM and swap with realistic thresholds. Only alerts on truly problematic memory situations. """ try: memory = psutil.virtual_memory() swap = psutil.swap_memory() current_time = time.time() mem_percent = memory.percent swap_percent = swap.percent if swap.total > 0 else 0 swap_vs_ram = (swap.used / memory.total * 100) if memory.total > 0 else 0 state_key = 'memory_usage' self.state_history[state_key].append({ 'mem_percent': mem_percent, 'swap_percent': swap_percent, 'swap_vs_ram': swap_vs_ram, 'time': current_time }) self.state_history[state_key] = [ entry for entry in self.state_history[state_key] if current_time - entry['time'] < 600 ] mem_critical = sum( 1 for entry in self.state_history[state_key] if entry['mem_percent'] >= 90 and current_time - entry['time'] <= self.MEMORY_DURATION ) mem_warning = sum( 1 for entry in self.state_history[state_key] if entry['mem_percent'] >= self.MEMORY_WARNING and current_time - entry['time'] <= self.MEMORY_DURATION ) swap_critical = sum( 1 for entry in self.state_history[state_key] if entry['swap_vs_ram'] > 20 and current_time - entry['time'] <= self.SWAP_CRITICAL_DURATION ) if mem_critical >= 2: status = 'CRITICAL' reason = f'RAM >90% for {self.MEMORY_DURATION}s' elif swap_critical >= 2: status = 'CRITICAL' reason = f'Swap >20% of RAM ({swap_vs_ram:.1f}%)' elif mem_warning >= 2: status = 'WARNING' reason = f'RAM >{self.MEMORY_WARNING}% for {self.MEMORY_DURATION}s' else: status = 'OK' reason = None ram_avail_gb = round(memory.available / (1024**3), 2) ram_total_gb = round(memory.total / (1024**3), 2) swap_used_gb = round(swap.used / (1024**3), 2) swap_total_gb = round(swap.total / (1024**3), 2) # Determine per-sub-check status ram_status = 'CRITICAL' if mem_percent >= 90 and mem_critical >= 2 else ('WARNING' if mem_percent >= self.MEMORY_WARNING and mem_warning >= 2 else 'OK') swap_status = 'CRITICAL' if swap_critical >= 2 else 'OK' result = { 'status': status, 'ram_percent': round(mem_percent, 1), 'ram_available_gb': ram_avail_gb, 'swap_percent': round(swap_percent, 1), 'swap_used_gb': swap_used_gb, 'checks': { 'ram_usage': { 'status': ram_status, 'detail': 'High RAM usage sustained' if ram_status != 'OK' else 'Normal' }, 'swap_usage': { 'status': swap_status, 'detail': 'Excessive swap usage' if swap_status != 'OK' else ('Normal' if swap.total > 0 else 'No swap configured') } } } if reason: result['reason'] = reason return result except Exception as e: return {'status': 'UNKNOWN', 'reason': f'Memory check failed: {str(e)}'} def _check_storage_optimized(self) -> Dict[str, Any]: """ Optimized storage check - monitors Proxmox storages from pvesm status. Checks for inactive storages, disk health from SMART/events, and ZFS pool health. """ issues = [] storage_details = {} # Check disk usage and mount status for important mounts. # We detect actual mountpoints dynamically rather than hard-coding. critical_mounts = set() critical_mounts.add('/') try: for part in psutil.disk_partitions(all=False): mp = part.mountpoint # Include standard system mounts and PVE storage if mp in ('/', '/var', '/tmp', '/boot', '/boot/efi') or \ mp.startswith('/var/lib/vz') or mp.startswith('/mnt/'): critical_mounts.add(mp) except Exception: pass critical_mounts = sorted(critical_mounts) for mount_point in critical_mounts: try: result = subprocess.run( ['mountpoint', '-q', mount_point], capture_output=True, timeout=2 ) if result.returncode != 0: issues.append(f'{mount_point}: Not mounted') storage_details[mount_point] = { 'status': 'CRITICAL', 'reason': 'Not mounted' } continue # Check if read-only with open('/proc/mounts', 'r') as f: for line in f: parts = line.split() if len(parts) >= 4 and parts[1] == mount_point: options = parts[3].split(',') if 'ro' in options: issues.append(f'{mount_point}: Mounted read-only') storage_details[mount_point] = { 'status': 'CRITICAL', 'reason': 'Mounted read-only' } break # Found it, no need to check further for this mountpoint # Check filesystem usage only if not already flagged as critical if mount_point not in storage_details or storage_details[mount_point].get('status') == 'OK': fs_status = self._check_filesystem(mount_point) error_key = f'disk_space_{mount_point}' if fs_status['status'] != 'OK': issues.append(f"{mount_point}: {fs_status['reason']}") storage_details[mount_point] = fs_status # Record persistent error for notifications usage = psutil.disk_usage(mount_point) avail_gb = usage.free / (1024**3) if avail_gb >= 1: avail_str = f"{avail_gb:.1f} GiB" else: avail_str = f"{usage.free / (1024**2):.0f} MiB" health_persistence.record_error( error_key=error_key, category='disk', severity=fs_status['status'], reason=f'{mount_point}: {fs_status["reason"]}', details={ 'mount': mount_point, 'used': str(round(usage.percent, 1)), 'available': avail_str, 'dismissable': False, } ) else: # Space recovered -- clear any previous alert health_persistence.clear_error(error_key) except Exception: pass # Silently skip if mountpoint check fails # Check ZFS pool health status zfs_pool_issues = self._check_zfs_pool_health() if zfs_pool_issues: for pool_name, pool_info in zfs_pool_issues.items(): issues.append(f'{pool_name}: {pool_info["reason"]}') storage_details[pool_name] = pool_info # Check disk health from Proxmox task log or system logs (SMART, etc.) disk_health_issues = self._check_disk_health_from_events() if disk_health_issues: for disk, issue in disk_health_issues.items(): # Only add if not already covered by critical mountpoint issues if disk not in storage_details or storage_details[disk].get('status') == 'OK': issues.append(f'{disk}: {issue["reason"]}') storage_details[disk] = issue # Check LVM status lvm_status = self._check_lvm() if lvm_status.get('status') == 'WARNING': # LVM volumes might be okay but indicate potential issues issues.append(f"LVM check: {lvm_status.get('reason')}") storage_details['lvm_check'] = lvm_status # Check dmesg for real-time I/O errors (dmesg-based, complements journalctl SMART checks) dmesg_io_result = self._check_disks_optimized() if dmesg_io_result.get('status') != 'OK': dmesg_details = dmesg_io_result.get('details', {}) for disk_path, disk_info in dmesg_details.items(): if disk_path not in storage_details or storage_details[disk_path].get('status') == 'OK': issues.append(f'{disk_path}: {disk_info.get("reason", "I/O errors")}') storage_details[disk_path] = disk_info # Build checks dict from storage_details, adding OK entries for items with no issues checks = {} for key, val in storage_details.items(): checks[key] = { 'status': val.get('status', 'OK'), 'detail': val.get('reason', 'OK'), **{k: v for k, v in val.items() if k not in ('status', 'reason')} } if not issues: # Add descriptive OK entries only for capabilities this server actually has checks['root_filesystem'] = checks.get('/', {'status': 'OK', 'detail': 'Mounted read-write, space OK'}) checks['io_errors'] = {'status': 'OK', 'detail': 'No I/O errors in dmesg'} if self.capabilities.get('has_smart'): checks['smart_health'] = {'status': 'OK', 'detail': 'No SMART warnings in journal'} if self.capabilities.get('has_zfs'): checks['zfs_pools'] = {'status': 'OK', 'detail': 'ZFS pools healthy'} if self.capabilities.get('has_lvm'): checks['lvm_volumes'] = {'status': 'OK', 'detail': 'LVM volumes OK'} return {'status': 'OK', 'checks': checks} # Determine overall status has_critical = any(d.get('status') == 'CRITICAL' for d in storage_details.values()) return { 'status': 'CRITICAL' if has_critical else 'WARNING', 'reason': '; '.join(issues[:3]), 'details': storage_details, 'checks': checks } def _check_filesystem(self, mount_point: str) -> Dict[str, Any]: """Check individual filesystem for space and mount status""" try: usage = psutil.disk_usage(mount_point) percent = usage.percent if percent >= self.STORAGE_CRITICAL: status = 'CRITICAL' reason = f'{percent:.1f}% full (≥{self.STORAGE_CRITICAL}%)' elif percent >= self.STORAGE_WARNING: status = 'WARNING' reason = f'{percent:.1f}% full (≥{self.STORAGE_WARNING}%)' else: status = 'OK' reason = None result = { 'status': status, 'usage_percent': round(percent, 1) } if reason: result['reason'] = reason return result except Exception as e: return { 'status': 'WARNING', 'reason': f'Check failed: {str(e)}' } def _check_lvm(self) -> Dict[str, Any]: """Check LVM volumes - improved detection""" try: # Check if lvs command is available result_which = subprocess.run( ['which', 'lvs'], capture_output=True, text=True, timeout=1 ) if result_which.returncode != 0: return {'status': 'OK'} # LVM not installed result = subprocess.run( ['lvs', '--noheadings', '--options', 'lv_name,vg_name,lv_attr'], capture_output=True, text=True, timeout=3 ) if result.returncode != 0: return {'status': 'WARNING', 'reason': 'lvs command failed'} volumes = [] for line in result.stdout.strip().split('\n'): if line.strip(): parts = line.split() if len(parts) >= 2: lv_name = parts[0].strip() vg_name = parts[1].strip() # Check for 'a' attribute indicating active/available if 'a' in parts[2]: volumes.append(f'{vg_name}/{lv_name}') # If LVM is configured but no active volumes are found, it might be an issue or just not used if not volumes: # Check if any VGs exist to determine if LVM is truly unconfigured or just inactive vg_result = subprocess.run( ['vgs', '--noheadings', '--options', 'vg_name'], capture_output=True, text=True, timeout=3 ) if vg_result.returncode == 0 and vg_result.stdout.strip(): return {'status': 'WARNING', 'reason': 'No active LVM volumes detected'} else: return {'status': 'OK'} # No VGs found, LVM not in use return {'status': 'OK', 'volumes': len(volumes)} except Exception: return {'status': 'OK'} # This function is no longer used in get_detailed_status, but kept for reference if needed. # The new _check_proxmox_storage function handles this logic better. def _check_proxmox_storages(self) -> Dict[str, Any]: """Check Proxmox-specific storages (only report problems)""" storages = {} try: if os.path.exists('/etc/pve/storage.cfg'): with open('/etc/pve/storage.cfg', 'r') as f: current_storage = None storage_type = None for line in f: line = line.strip() if line.startswith('dir:') or line.startswith('nfs:') or \ line.startswith('cifs:') or line.startswith('pbs:') or \ line.startswith('rbd:') or line.startswith('cephfs:') or \ line.startswith('zfs:') or line.startswith('zfs-send:'): parts = line.split(':', 1) storage_type = parts[0] current_storage = parts[1].strip() elif line.startswith('path ') and current_storage: path = line.split(None, 1)[1] if storage_type == 'dir': if not os.path.exists(path): storages[f'storage_{current_storage}'] = { 'status': 'CRITICAL', 'reason': 'Directory does not exist', 'type': 'dir', 'path': path } current_storage = None storage_type = None except Exception: pass return storages def _resolve_ata_to_disk(self, ata_port: str) -> str: """Resolve an ATA controller name (e.g. 'ata8') to a block device (e.g. 'sda'). Uses /sys/class/ata_port/ symlinks and /sys/block/ to find the mapping. Falls back to parsing dmesg for 'ata8: SATA link up' -> 'sd 7:0:0:0: [sda]'. """ if not ata_port or not ata_port.startswith('ata'): return ata_port port_num = ata_port.replace('ata', '') # Method 1: Walk /sys/class/ata_port/ -> host -> target -> block try: ata_path = f'/sys/class/ata_port/{ata_port}' if os.path.exists(ata_path): device_path = os.path.realpath(ata_path) # Walk up to find the SCSI host, then find block devices # Path: /sys/devices/.../ataX/hostY/targetY:0:0/Y:0:0:0/block/sdZ for root, dirs, files in os.walk(os.path.dirname(device_path)): if 'block' in dirs: block_path = os.path.join(root, 'block') devs = os.listdir(block_path) if devs: return devs[0] # e.g. 'sda' except (OSError, IOError): pass # Method 2: Parse dmesg for ATA link messages try: result = subprocess.run( ['dmesg', '--notime'], capture_output=True, text=True, timeout=2 ) if result.returncode == 0: # Look for "ata8: SATA link up" followed by "sd X:0:0:0: [sda]" lines = result.stdout.split('\n') host_num = None for line in lines: m = re.search(rf'{ata_port}:\s+SATA link', line) if m: # ata port number maps to host(N-1) typically host_num = int(port_num) - 1 if host_num is not None: m2 = re.search(rf'sd\s+{host_num}:\d+:\d+:\d+:\s+\[(\w+)\]', line) if m2: return m2.group(1) except (OSError, subprocess.TimeoutExpired): pass return ata_port # Return original if resolution fails def _identify_block_device(self, device: str) -> str: """ Identify a block device by querying lsblk. Returns a human-readable string like: "KINGSTON SA400S37960G (SSD, 894.3G) mounted at /mnt/data" Returns empty string if the device is not found in lsblk. """ if not device or device == 'unknown': return '' try: candidates = [device] base = re.sub(r'\d+$', '', device) if not ('nvme' in device or 'mmcblk' in device) else device if base != device: candidates.append(base) for dev in candidates: dev_path = f'/dev/{dev}' if not dev.startswith('/') else dev result = subprocess.run( ['lsblk', '-ndo', 'NAME,MODEL,SIZE,TRAN,MOUNTPOINT,ROTA', dev_path], capture_output=True, text=True, timeout=3 ) if result.returncode == 0 and result.stdout.strip(): fields = result.stdout.strip().split(None, 5) name = fields[0] if len(fields) > 0 else dev model = fields[1] if len(fields) > 1 and fields[1] else 'Unknown model' size = fields[2] if len(fields) > 2 else '?' tran = (fields[3] if len(fields) > 3 else '').upper() mountpoint = fields[4] if len(fields) > 4 and fields[4] else '' rota = fields[5].strip() if len(fields) > 5 else '1' if tran == 'USB': disk_type = 'USB' elif tran == 'NVME' or 'nvme' in name: disk_type = 'NVMe' elif rota == '0': disk_type = 'SSD' else: disk_type = 'HDD' info = f'{model} ({disk_type}, {size})' if mountpoint: info += f' mounted at {mountpoint}' elif dev != device: part_result = subprocess.run( ['lsblk', '-ndo', 'MOUNTPOINT', f'/dev/{device}'], capture_output=True, text=True, timeout=2 ) part_mount = part_result.stdout.strip() if part_result.returncode == 0 else '' if part_mount: info += f' partition {device} mounted at {part_mount}' else: info += ' -- not mounted' else: info += ' -- not mounted' return info return '' except Exception: return '' def _quick_smart_health(self, disk_name: str) -> str: """Quick SMART health check for a single disk. Returns 'PASSED', 'FAILED', or 'UNKNOWN'.""" if not disk_name or disk_name.startswith('ata') or disk_name.startswith('zram'): return 'UNKNOWN' try: dev_path = f'/dev/{disk_name}' if not disk_name.startswith('/') else disk_name result = subprocess.run( ['smartctl', '--health', '-j', dev_path], capture_output=True, text=True, timeout=5 ) import json as _json data = _json.loads(result.stdout) passed = data.get('smart_status', {}).get('passed', None) if passed is True: return 'PASSED' elif passed is False: return 'FAILED' return 'UNKNOWN' except Exception: return 'UNKNOWN' def _check_all_disks_smart(self, fallback: str = 'UNKNOWN') -> str: """Check SMART health of ALL physical disks. Used when an ATA port can't be resolved to a specific /dev/sdX. If ALL disks report PASSED, returns 'PASSED' (errors are transient). If ANY disk reports FAILED, returns 'FAILED'. Otherwise returns the fallback value. """ try: # List all block devices (exclude partitions, loop, zram, dm) result = subprocess.run( ['lsblk', '-dnpo', 'NAME,TYPE'], capture_output=True, text=True, timeout=3 ) if result.returncode != 0: return fallback disks = [] for line in result.stdout.strip().split('\n'): parts = line.split() if len(parts) >= 2 and parts[1] == 'disk': disks.append(parts[0]) # e.g. /dev/sda if not disks: return fallback all_passed = True any_failed = False checked = 0 for dev in disks: health = self._quick_smart_health(dev) if health == 'PASSED': checked += 1 elif health == 'FAILED': any_failed = True break else: all_passed = False # Can't confirm this disk if any_failed: return 'FAILED' if all_passed and checked > 0: return 'PASSED' return fallback except Exception: return fallback def _check_disks_optimized(self) -> Dict[str, Any]: """ Disk I/O error check -- the SINGLE source of truth for disk errors. Reads dmesg for I/O/ATA/SCSI errors, counts per device, records in health_persistence, and returns status for the health dashboard. Resolves ATA controller names (ata8) to physical disks (sda). Cross-references SMART health to avoid false positives from transient ATA controller errors. If SMART reports PASSED, dmesg errors are downgraded to INFO (transient). """ current_time = time.time() disk_results = {} # Single dict for both WARNING and CRITICAL # Common transient ATA patterns that auto-recover and are not real disk failures. # These are bus/controller level events, NOT media errors: # action 0x0 = no action needed (fully recovered) # action 0x6 = hard reset + port reinit (common cable/connector recovery) # SError with BadCRC/Dispar = signal integrity issue (cable, not disk) # Emask 0x10 = ATA bus error (controller/interconnect, not media) TRANSIENT_PATTERNS = [ re.compile(r'exception\s+emask.*action\s+0x[06]', re.IGNORECASE), re.compile(r'serror.*=.*0x[0-9a-f]+\s*\(', re.IGNORECASE), re.compile(r'SError:.*\{.*(?:BadCRC|Dispar|CommWake).*\}', re.IGNORECASE), re.compile(r'emask\s+0x10\s+\(ATA bus error\)', re.IGNORECASE), re.compile(r'failed command:\s*READ FPDMA QUEUED', re.IGNORECASE), ] try: # Check dmesg for I/O errors in the last 5 minutes result = subprocess.run( ['dmesg', '-T', '--level=err,warn', '--since', '5 minutes ago'], capture_output=True, text=True, timeout=2 ) # Collect a sample line per device for richer error messages disk_samples = {} # Track if ALL errors for a device are transient patterns disk_transient_only = {} if result.returncode == 0: for line in result.stdout.split('\n'): line_lower = line.lower() # Detect various disk error formats is_disk_error = any(kw in line_lower for kw in [ 'i/o error', 'scsi error', 'medium error', 'failed command:', 'exception emask', ]) ata_match = re.search(r'(ata\d+)[\.\d]*:.*(?:error|failed|exception)', line_lower) if ata_match: is_disk_error = True if is_disk_error: # Check if this specific line is a known transient pattern is_transient = any(p.search(line) for p in TRANSIENT_PATTERNS) # Extract device from multiple formats raw_device = None for dev_re in [ r'dev\s+(sd[a-z]+)', # dev sdb r'\[(sd[a-z]+)\]', # [sda] r'/dev/(sd[a-z]+)', # /dev/sda r'(nvme\d+n\d+)', # nvme0n1 r'device\s+(sd[a-z]+\d*)', # device sda1 r'(ata\d+)', # ata8 (ATA controller) ]: dm = re.search(dev_re, line) if dm: raw_device = dm.group(1) break if raw_device: # Resolve ATA port to physical disk name if raw_device.startswith('ata'): resolved = self._resolve_ata_to_disk(raw_device) disk_name = resolved else: disk_name = raw_device.rstrip('0123456789') if raw_device.startswith('sd') else raw_device self.io_error_history[disk_name].append(current_time) if disk_name not in disk_samples: clean = re.sub(r'^\[.*?\]\s*', '', line.strip()) disk_samples[disk_name] = clean[:200] # Track transient status: if ANY non-transient error is found, mark False if disk_name not in disk_transient_only: disk_transient_only[disk_name] = is_transient elif not is_transient: disk_transient_only[disk_name] = False # Clean old history and evaluate per-disk status for disk in list(self.io_error_history.keys()): self.io_error_history[disk] = [ t for t in self.io_error_history[disk] if current_time - t < 300 ] error_count = len(self.io_error_history[disk]) error_key = f'disk_{disk}' sample = disk_samples.get(disk, '') display = f'/dev/{disk}' if not disk.startswith('/') else disk all_transient = disk_transient_only.get(disk, False) if error_count >= 1: # Cross-reference with SMART to determine real severity smart_health = self._quick_smart_health(disk) # If SMART is UNKNOWN (unresolved ATA port), check ALL # physical disks. If every disk passes SMART, the ATA # errors are transient bus/controller noise. if smart_health == 'UNKNOWN': smart_health = self._check_all_disks_smart(smart_health) smart_ok = smart_health == 'PASSED' # Transient-only errors (e.g. SError with auto-recovery) # are always INFO regardless of SMART if all_transient: reason = f'{display}: {error_count} transient ATA event(s) in 5 min (auto-recovered)' if sample: reason += f'\n{sample}' health_persistence.resolve_error(error_key, 'Transient ATA events, auto-recovered') disk_results[display] = { 'status': 'INFO', 'reason': reason, 'device': disk, 'error_count': error_count, 'smart_status': smart_health, 'dismissable': False, 'error_key': error_key, } elif smart_ok: # SMART is healthy -> dmesg errors are informational only # The disk is fine; these are transient controller/bus events reason = f'{display}: {error_count} I/O event(s) in 5 min (SMART: OK)' if sample: reason += f'\n{sample}' # Resolve any previous error since SMART confirms disk is healthy health_persistence.resolve_error(error_key, 'SMART healthy, I/O events are transient') disk_results[display] = { 'status': 'INFO', 'reason': reason, 'device': disk, 'error_count': error_count, 'smart_status': smart_health, 'dismissable': False, 'error_key': error_key, } elif smart_health == 'FAILED': # SMART confirms a real disk failure severity = 'CRITICAL' reason = f'{display}: {error_count} I/O error(s) in 5 min (SMART: FAILED)' if sample: reason += f'\n{sample}' health_persistence.record_error( error_key=error_key, category='disks', severity=severity, reason=reason, details={'disk': disk, 'device': display, 'error_count': error_count, 'smart_status': smart_health, 'sample': sample, 'dismissable': False} ) disk_results[display] = { 'status': severity, 'reason': reason, 'device': disk, 'error_count': error_count, 'smart_status': smart_health, 'dismissable': False, 'error_key': error_key, } else: # SMART is genuinely UNKNOWN (no disk resolved, no # smartctl at all) -- treat as WARNING, not CRITICAL. # These are likely transient and will auto-resolve. severity = 'WARNING' reason = f'{display}: {error_count} I/O event(s) in 5 min (SMART: unavailable)' if sample: reason += f'\n{sample}' # Only record to persistence ONCE. If the error is # already active, don't call record_error again -- # that would keep updating last_seen and preventing # the freshness check from detecting it as stale. if not health_persistence.is_error_active(error_key, category='disks'): health_persistence.record_error( error_key=error_key, category='disks', severity=severity, reason=reason, details={'disk': disk, 'device': display, 'error_count': error_count, 'smart_status': smart_health, 'sample': sample, 'dismissable': True} ) disk_results[display] = { 'status': severity, 'reason': reason, 'device': disk, 'error_count': error_count, 'smart_status': smart_health, 'dismissable': True, 'error_key': error_key, } else: health_persistence.resolve_error(error_key, 'Disk errors cleared') # Also include active filesystem errors (detected by _check_log_analysis # and cross-referenced to the 'disks' category) try: fs_errors = health_persistence.get_active_errors(category='disks') for err in fs_errors: err_key = err.get('error_key', '') if not err_key.startswith('disk_fs_'): continue # Only filesystem cross-references # Skip acknowledged/dismissed errors if err.get('acknowledged') == 1: continue details = err.get('details', {}) if isinstance(details, str): try: import json as _json details = _json.loads(details) except Exception: details = {} device = details.get('device', err_key.replace('disk_fs_', '/dev/')) base_disk = details.get('disk', '') # Check if the device still exists. If not, auto-resolve # the error -- it was likely a disconnected USB/temp device. dev_path = f'/dev/{base_disk}' if base_disk else device if not os.path.exists(dev_path): health_persistence.resolve_error( err_key, 'Device no longer present in system') continue # Cross-reference with SMART: if SMART is healthy for # this disk, downgrade to INFO (transient fs error). severity = err.get('severity', 'WARNING') if base_disk: smart_health = self._quick_smart_health(base_disk) if smart_health == 'PASSED' and severity == 'CRITICAL': severity = 'WARNING' if device not in disk_results: disk_results[device] = { 'status': severity, 'reason': err.get('reason', 'Filesystem error'), 'device': base_disk, 'error_count': 1, 'error_type': 'filesystem', 'dismissable': True, 'error_key': err_key, } except Exception: pass if not disk_results: return {'status': 'OK'} # Overall status: only count WARNING+ (skip INFO) active_results = {k: v for k, v in disk_results.items() if v.get('status') not in ('OK', 'INFO')} if not active_results: return { 'status': 'OK', 'reason': 'Transient ATA events only (SMART healthy)', 'details': disk_results } has_critical = any(d.get('status') == 'CRITICAL' for d in active_results.values()) return { 'status': 'CRITICAL' if has_critical else 'WARNING', 'reason': f"{len(active_results)} disk(s) with errors", 'details': disk_results } except Exception as e: print(f"[HealthMonitor] Disk/IO check failed: {e}") return {'status': 'UNKNOWN', 'reason': f'Disk check unavailable: {str(e)}', 'checks': {}} def _check_network_optimized(self) -> Dict[str, Any]: """ Optimized network check - only alerts for interfaces that are actually in use. Avoids false positives for unused physical interfaces. """ try: issues = [] interface_details = {} net_if_stats = psutil.net_if_stats() try: net_io_per_nic = psutil.net_io_counters(pernic=True) except Exception: net_io_per_nic = {} try: net_if_addrs = psutil.net_if_addrs() except Exception: net_if_addrs = {} active_interfaces = set() for interface, stats in net_if_stats.items(): if interface == 'lo': continue # Check if important interface is down if not stats.isup: should_alert = False alert_reason = None # Check if it's a bridge interface (always important for VMs/LXCs) if interface.startswith('vmbr'): should_alert = True alert_reason = 'Bridge interface DOWN (VMs/LXCs may be affected)' # Check if physical interface has configuration or traffic elif interface.startswith(('eth', 'ens', 'enp', 'eno')): # Check if interface has IP address (configured) has_ip = False if interface in net_if_addrs: for addr in net_if_addrs[interface]: if addr.family == 2: # IPv4 has_ip = True break # Check if interface has traffic (has been used) has_traffic = False if interface in net_io_per_nic: io_stats = net_io_per_nic[interface] # If interface has sent or received any data, it's being used if io_stats.bytes_sent > 0 or io_stats.bytes_recv > 0: has_traffic = True # Only alert if interface is configured or has been used if has_ip: should_alert = True alert_reason = 'Configured interface DOWN (has IP address)' elif has_traffic: should_alert = True alert_reason = 'Active interface DOWN (was handling traffic)' if should_alert: issues.append(f'{interface} is DOWN') error_key = interface health_persistence.record_error( error_key=error_key, category='network', severity='CRITICAL', reason=alert_reason or 'Interface DOWN', details={'interface': interface, 'dismissable': False} ) interface_details[interface] = { 'status': 'CRITICAL', 'reason': alert_reason or 'Interface DOWN', 'dismissable': False } else: active_interfaces.add(interface) if interface.startswith('vmbr') or interface.startswith(('eth', 'ens', 'enp', 'eno')): health_persistence.resolve_error(interface, 'Interface recovered') # Check connectivity (latency) latency_status = self._check_network_latency() if latency_status: latency_ms = latency_status.get('latency_ms', 'N/A') latency_sev = latency_status.get('status', 'OK') interface_details['connectivity'] = latency_status connectivity_check = { 'status': latency_sev if latency_sev not in ['UNKNOWN'] else 'OK', 'detail': f'Latency {latency_ms}ms to 1.1.1.1' if isinstance(latency_ms, (int, float)) else latency_status.get('reason', 'Unknown'), } if latency_sev not in ['OK', 'INFO', 'UNKNOWN']: issues.append(latency_status.get('reason', 'Network latency issue')) else: connectivity_check = {'status': 'OK', 'detail': 'Not tested'} # Build checks dict checks = {} for iface in active_interfaces: checks[iface] = {'status': 'OK', 'detail': 'UP'} for iface, detail in interface_details.items(): if iface != 'connectivity': checks[iface] = { 'status': detail.get('status', 'OK'), 'detail': detail.get('reason', 'DOWN'), 'dismissable': detail.get('dismissable', False) } checks['connectivity'] = connectivity_check if not issues: return {'status': 'OK', 'checks': checks} has_critical = any(d.get('status') == 'CRITICAL' for d in interface_details.values()) return { 'status': 'CRITICAL' if has_critical else 'WARNING', 'reason': '; '.join(issues[:2]), 'details': interface_details, 'checks': checks } except Exception as e: print(f"[HealthMonitor] Network check failed: {e}") return {'status': 'UNKNOWN', 'reason': f'Network check unavailable: {str(e)}', 'checks': {}} def _check_network_latency(self) -> Optional[Dict[str, Any]]: """Check network latency to 1.1.1.1 (cached)""" cache_key = 'network_latency' current_time = time.time() if cache_key in self.last_check_times: if current_time - self.last_check_times[cache_key] < 60: return self.cached_results.get(cache_key) try: result = subprocess.run( ['ping', '-c', '1', '-W', '1', '1.1.1.1'], capture_output=True, text=True, timeout=self.NETWORK_TIMEOUT ) if result.returncode == 0: for line in result.stdout.split('\n'): if 'time=' in line: try: latency_str = line.split('time=')[1].split()[0] latency = float(latency_str) if latency > self.NETWORK_LATENCY_CRITICAL: status = 'CRITICAL' reason = f'Latency {latency:.1f}ms >{self.NETWORK_LATENCY_CRITICAL}ms' elif latency > self.NETWORK_LATENCY_WARNING: status = 'WARNING' reason = f'Latency {latency:.1f}ms >{self.NETWORK_LATENCY_WARNING}ms' else: status = 'OK' reason = None latency_result = { 'status': status, 'latency_ms': round(latency, 1) } if reason: latency_result['reason'] = reason self.cached_results[cache_key] = latency_result self.last_check_times[cache_key] = current_time return latency_result except: pass # If ping failed (timeout, unreachable) - distinguish the reason stderr_lower = (result.stderr or '').lower() if hasattr(result, 'stderr') else '' if 'unreachable' in stderr_lower or 'network is unreachable' in stderr_lower: fail_reason = 'Network unreachable - no route to 1.1.1.1' elif result.returncode == 1: fail_reason = 'Packet loss to 1.1.1.1 (100% loss)' else: fail_reason = f'Ping failed (exit code {result.returncode})' packet_loss_result = { 'status': 'CRITICAL', 'reason': fail_reason } self.cached_results[cache_key] = packet_loss_result self.last_check_times[cache_key] = current_time return packet_loss_result except subprocess.TimeoutExpired: timeout_result = { 'status': 'WARNING', 'reason': f'Ping timeout (>{self.NETWORK_TIMEOUT}s) - possible high latency' } self.cached_results[cache_key] = timeout_result self.last_check_times[cache_key] = current_time return timeout_result except Exception: return {'status': 'UNKNOWN', 'reason': 'Ping command failed'} def _is_vzdump_active(self) -> bool: """Check if a vzdump (backup) job is currently running.""" try: with open('/var/log/pve/tasks/active', 'r') as f: for line in f: if ':vzdump:' in line: return True except (OSError, IOError): pass return False def _resolve_vm_name(self, vmid: str) -> str: """Resolve VMID to guest name from PVE config files.""" if not vmid: return '' for base in ['/etc/pve/qemu-server', '/etc/pve/lxc']: conf = os.path.join(base, f'{vmid}.conf') try: with open(conf) as f: for line in f: if line.startswith('hostname:') or line.startswith('name:'): return line.split(':', 1)[1].strip() except (OSError, IOError): continue return '' def _check_vms_cts_optimized(self) -> Dict[str, Any]: """ Optimized VM/CT check - detects qmp failures and startup errors from logs. Improved detection of container and VM errors from journalctl. """ try: # First: auto-resolve any persisted VM/CT errors where the guest # is now running. This clears stale "Failed to start" / QMP # errors that are no longer relevant. try: active_vm_errors = health_persistence.get_active_errors('vms') for err in active_vm_errors: details = err.get('details') or {} vmid = details.get('id', '') if vmid: health_persistence.check_vm_running(vmid) except Exception: pass issues = [] vm_details = {} result = subprocess.run( ['journalctl', '--since', '10 minutes ago', '--no-pager', '-p', 'warning'], capture_output=True, text=True, timeout=3 ) # Check if vzdump is running -- QMP timeouts during backup are normal _vzdump_running = self._is_vzdump_active() if result.returncode == 0: for line in result.stdout.split('\n'): line_lower = line.lower() vm_qmp_match = re.search(r'vm\s+(\d+)\s+qmp\s+command.*(?:failed|unable|timeout)', line_lower) if vm_qmp_match: if _vzdump_running: continue # Normal during backup vmid = vm_qmp_match.group(1) vm_name = self._resolve_vm_name(vmid) display = f"VM {vmid} ({vm_name})" if vm_name else f"VM {vmid}" key = f'vm_{vmid}' if key not in vm_details: issues.append(f'{display}: QMP communication issue') vm_details[key] = { 'status': 'WARNING', 'reason': f'{display}: QMP command failed or timed out.\n{line.strip()[:200]}', 'id': vmid, 'vmname': vm_name, 'type': 'VM' } continue ct_error_match = re.search(r'(?:ct|container|lxc)\s+(\d+)', line_lower) if ct_error_match and ('error' in line_lower or 'fail' in line_lower or 'device' in line_lower): ctid = ct_error_match.group(1) key = f'ct_{ctid}' if key not in vm_details: if 'device' in line_lower and 'does not exist' in line_lower: device_match = re.search(r'device\s+([/\w\d]+)\s+does not exist', line_lower) if device_match: reason = f'Device {device_match.group(1)} missing' else: reason = 'Device error' elif 'failed to start' in line_lower: reason = 'Failed to start' else: reason = 'Container error' ct_name = self._resolve_vm_name(ctid) display = f"CT {ctid} ({ct_name})" if ct_name else f"CT {ctid}" full_reason = f'{display}: {reason}\n{line.strip()[:200]}' issues.append(f'{display}: {reason}') vm_details[key] = { 'status': 'WARNING' if 'device' in reason.lower() else 'CRITICAL', 'reason': full_reason, 'id': ctid, 'vmname': ct_name, 'type': 'CT' } continue vzstart_match = re.search(r'vzstart:(\d+):', line) if vzstart_match and ('error' in line_lower or 'fail' in line_lower or 'does not exist' in line_lower): ctid = vzstart_match.group(1) key = f'ct_{ctid}' if key not in vm_details: # Extraer mensaje de error if 'device' in line_lower and 'does not exist' in line_lower: device_match = re.search(r'device\s+([/\w\d]+)\s+does not exist', line_lower) if device_match: reason = f'Device {device_match.group(1)} missing' else: reason = 'Device error' else: reason = 'Startup error' issues.append(f'CT {ctid}: {reason}') vm_details[key] = { 'status': 'WARNING', 'reason': reason, 'id': ctid, 'type': 'CT' } continue if any(keyword in line_lower for keyword in ['failed to start', 'cannot start', 'activation failed', 'start error']): id_match = re.search(r'\b(\d{3,4})\b', line) if id_match: vmid = id_match.group(1) key = f'vmct_{vmid}' if key not in vm_details: vm_name = self._resolve_vm_name(vmid) display = f"VM/CT {vmid} ({vm_name})" if vm_name else f"VM/CT {vmid}" full_reason = f'{display}: Failed to start\n{line.strip()[:200]}' issues.append(f'{display}: Failed to start') vm_details[key] = { 'status': 'CRITICAL', 'reason': full_reason, 'id': vmid, 'vmname': vm_name, 'type': 'VM/CT' } if not issues: return {'status': 'OK'} has_critical = any(d.get('status') == 'CRITICAL' for d in vm_details.values()) return { 'status': 'CRITICAL' if has_critical else 'WARNING', 'reason': '; '.join(issues[:3]), 'details': vm_details } except Exception as e: print(f"[HealthMonitor] VMs/CTs check failed: {e}") return {'status': 'UNKNOWN', 'reason': f'VM/CT check unavailable: {str(e)}', 'checks': {}} # Modified to use persistence def _check_vms_cts_with_persistence(self) -> Dict[str, Any]: """ Check VMs/CTs with persistent error tracking. Errors persist until VM starts or 48h elapsed. """ try: issues = [] vm_details = {} # Get persistent errors first persistent_errors = health_persistence.get_active_errors('vms') # Check if any persistent VMs/CTs have started or were dismissed for error in persistent_errors: error_key = error['error_key'] is_acknowledged = error.get('acknowledged') == 1 if error_key.startswith(('vm_', 'ct_', 'vmct_')): vm_id = error_key.split('_', 1)[1] # Check if VM is running using persistence helper if health_persistence.check_vm_running(vm_id): continue # Error auto-resolved if VM is now running # Still active, add to details vm_details[error_key] = { 'status': error['severity'], 'reason': error['reason'], 'id': error.get('details', {}).get('id', 'unknown'), 'type': error.get('details', {}).get('type', 'VM/CT'), 'first_seen': error['first_seen'], 'dismissed': is_acknowledged, } # Only add to issues if not dismissed if not is_acknowledged: issues.append(f"{error.get('details', {}).get('type', 'VM')} {error.get('details', {}).get('id', '')}: {error['reason']}") # Check for new errors in logs # Using 'warning' priority to catch potential startup issues result = subprocess.run( ['journalctl', '--since', '10 minutes ago', '--no-pager', '-p', 'warning'], capture_output=True, text=True, timeout=3 ) _vzdump_running = self._is_vzdump_active() if result.returncode == 0: for line in result.stdout.split('\n'): line_lower = line.lower() # VM QMP errors (skip during active backup -- normal behavior) vm_qmp_match = re.search(r'vm\s+(\d+)\s+qmp\s+command.*(?:failed|unable|timeout)', line_lower) if vm_qmp_match: if _vzdump_running: continue # Normal during backup vmid = vm_qmp_match.group(1) vm_name = self._resolve_vm_name(vmid) display = f"VM {vmid} ({vm_name})" if vm_name else f"VM {vmid}" error_key = f'vm_{vmid}' if error_key not in vm_details: rec_result = health_persistence.record_error( error_key=error_key, category='vms', severity='WARNING', reason=f'{display}: QMP command failed or timed out.\n{line.strip()[:200]}', details={'id': vmid, 'vmname': vm_name, 'type': 'VM'} ) if not rec_result or rec_result.get('type') != 'skipped_acknowledged': issues.append(f'{display}: QMP communication issue') vm_details[error_key] = { 'status': 'WARNING', 'reason': f'{display}: QMP command failed or timed out', 'id': vmid, 'vmname': vm_name, 'type': 'VM' } continue # Container errors (including startup issues via vzstart) vzstart_match = re.search(r'vzstart:(\d+):', line) if vzstart_match and ('error' in line_lower or 'fail' in line_lower or 'does not exist' in line_lower): ctid = vzstart_match.group(1) error_key = f'ct_{ctid}' if error_key not in vm_details: if 'device' in line_lower and 'does not exist' in line_lower: device_match = re.search(r'device\s+([/\w\d]+)\s+does not exist', line_lower) if device_match: reason = f'Device {device_match.group(1)} missing' else: reason = 'Device error' else: reason = 'Startup error' # Record persistent error rec_result = health_persistence.record_error( error_key=error_key, category='vms', severity='WARNING', reason=reason, details={'id': ctid, 'type': 'CT'} ) if not rec_result or rec_result.get('type') != 'skipped_acknowledged': issues.append(f'CT {ctid}: {reason}') vm_details[error_key] = { 'status': 'WARNING', 'reason': reason, 'id': ctid, 'type': 'CT' } # Generic failed to start for VMs and CTs if any(keyword in line_lower for keyword in ['failed to start', 'cannot start', 'activation failed', 'start error']): # Try contextual VMID patterns first (more precise), then fallback to generic id_match = ( re.search(r'(?:VMID|vmid|VM|CT|qemu|lxc|pct|qm)[:\s=/]+(\d{3,5})\b', line) or re.search(r'\b(\d{3,5})\.conf\b', line) or re.search(r'\b(\d{3,5})\b', line) ) if id_match: vmid_ctid = id_match.group(1) # Determine if it's a VM or CT based on context, if possible if 'vm' in line_lower or 'qemu' in line_lower: error_key = f'vm_{vmid_ctid}' vm_type = 'VM' elif 'ct' in line_lower or 'lxc' in line_lower: error_key = f'ct_{vmid_ctid}' vm_type = 'CT' else: # Fallback if type is unclear error_key = f'vmct_{vmid_ctid}' vm_type = 'VM/CT' if error_key not in vm_details: vm_name = self._resolve_vm_name(vmid_ctid) display = f"{vm_type} {vmid_ctid}" if vm_name: display = f"{vm_type} {vmid_ctid} ({vm_name})" reason = f'{display}: Failed to start\n{line.strip()[:200]}' # Record persistent error rec_result = health_persistence.record_error( error_key=error_key, category='vms', severity='CRITICAL', reason=reason, details={'id': vmid_ctid, 'vmname': vm_name, 'type': vm_type} ) if not rec_result or rec_result.get('type') != 'skipped_acknowledged': issues.append(f'{display}: Failed to start') vm_details[error_key] = { 'status': 'CRITICAL', 'reason': reason, 'id': vmid_ctid, 'vmname': vm_name, 'type': vm_type } # Build checks dict from vm_details # 'key' is the persistence error_key (e.g. 'qmp_110', 'ct_101', 'vm_110') checks = {} for key, val in vm_details.items(): vm_label = f"{val.get('type', 'VM')} {val.get('id', key)}" is_dismissed = val.get('dismissed', False) checks[vm_label] = { 'status': 'INFO' if is_dismissed else val.get('status', 'WARNING'), 'detail': val.get('reason', 'Error'), 'dismissable': True, 'dismissed': is_dismissed, 'error_key': key # Must match the persistence DB key } if not issues: # No active (non-dismissed) issues if not checks: checks['qmp_communication'] = {'status': 'OK', 'detail': 'No QMP timeouts detected'} checks['container_startup'] = {'status': 'OK', 'detail': 'No container startup errors'} checks['vm_startup'] = {'status': 'OK', 'detail': 'No VM startup failures'} checks['oom_killer'] = {'status': 'OK', 'detail': 'No OOM events detected'} return {'status': 'OK', 'checks': checks} # Only consider non-dismissed items for overall severity active_details = {k: v for k, v in vm_details.items() if not v.get('dismissed')} has_critical = any(d.get('status') == 'CRITICAL' for d in active_details.values()) return { 'status': 'CRITICAL' if has_critical else 'WARNING', 'reason': '; '.join(issues[:3]), 'details': vm_details, 'checks': checks } except Exception as e: print(f"[HealthMonitor] VMs/CTs persistence check failed: {e}") return {'status': 'UNKNOWN', 'reason': f'VM/CT check unavailable: {str(e)}', 'checks': {}} def _check_pve_services(self) -> Dict[str, Any]: """ Check critical Proxmox services with persistence tracking. - Checks the base PVE_SERVICES list - Dynamically adds corosync if a cluster config exists - Records failed services in persistence for tracking/dismiss - Auto-clears when services recover """ try: # Build service list: base PVE services + corosync if clustered services_to_check = list(self.PVE_SERVICES) is_cluster = os.path.exists('/etc/corosync/corosync.conf') if is_cluster and 'corosync' not in services_to_check: services_to_check.append('corosync') failed_services = [] service_details = {} for service in services_to_check: try: result = subprocess.run( ['systemctl', 'is-active', service], capture_output=True, text=True, timeout=2 ) status = result.stdout.strip() if result.returncode != 0 or status != 'active': failed_services.append(service) service_details[service] = status or 'inactive' except Exception: failed_services.append(service) service_details[service] = 'error' # Build checks dict with status per service checks = {} for svc in services_to_check: error_key = f'pve_service_{svc}' if svc in failed_services: state = service_details.get(svc, 'inactive') checks[svc] = { 'status': 'CRITICAL', 'detail': f'Service is {state}', 'error_key': error_key, 'dismissable': True, } else: checks[svc] = { 'status': 'OK', 'detail': 'Active', 'error_key': error_key, } if is_cluster: checks['cluster_mode'] = { 'status': 'OK', 'detail': 'Cluster detected (corosync.conf present)', } if failed_services: reason = f'Services inactive: {", ".join(failed_services)}' # Record each failed service in persistence, respecting dismiss active_failed = [] for svc in failed_services: error_key = f'pve_service_{svc}' rec_result = health_persistence.record_error( error_key=error_key, category='pve_services', severity='CRITICAL', reason=f'PVE service {svc} is {service_details.get(svc, "inactive")}', details={'service': svc, 'state': service_details.get(svc, 'inactive')} ) if rec_result and rec_result.get('type') == 'skipped_acknowledged': # Mark as dismissed in checks for frontend if svc in checks: checks[svc]['dismissed'] = True else: active_failed.append(svc) # Auto-clear services that recovered for svc in services_to_check: if svc not in failed_services: error_key = f'pve_service_{svc}' if health_persistence.is_error_active(error_key): health_persistence.clear_error(error_key) # If all failed services are dismissed, return OK if not active_failed: return { 'status': 'OK', 'reason': None, 'failed': [], 'is_cluster': is_cluster, 'services_checked': len(services_to_check), 'checks': checks } return { 'status': 'CRITICAL', 'reason': f'Services inactive: {", ".join(active_failed)}', 'failed': active_failed, 'is_cluster': is_cluster, 'services_checked': len(services_to_check), 'checks': checks } # All OK - clear any previously tracked service errors for svc in services_to_check: error_key = f'pve_service_{svc}' if health_persistence.is_error_active(error_key): health_persistence.clear_error(error_key) return { 'status': 'OK', 'is_cluster': is_cluster, 'services_checked': len(services_to_check), 'checks': checks } except Exception as e: return { 'status': 'WARNING', 'reason': f'Service check command failed: {str(e)}' } def _is_benign_error(self, line: str) -> bool: """Check if log line matches benign error patterns""" line_lower = line.lower() for pattern in self.BENIGN_ERROR_PATTERNS: if re.search(pattern, line_lower): return True return False def _enrich_critical_log_reason(self, line: str) -> str: """ Transform a raw kernel/system log line into a human-readable reason for notifications and the health dashboard. """ line_lower = line.lower() # EXT4/BTRFS/XFS/ZFS filesystem errors if 'ext4-fs error' in line_lower or 'btrfs error' in line_lower or 'xfs' in line_lower and 'error' in line_lower: fs_type = 'EXT4' if 'ext4' in line_lower else ('BTRFS' if 'btrfs' in line_lower else 'XFS') dev_match = re.search(r'device\s+(\S+?)\)?:', line) device = dev_match.group(1).rstrip(')') if dev_match else 'unknown' func_match = re.search(r':\s+(\w+):\d+:', line) func_name = func_match.group(1) if func_match else '' inode_match = re.search(r'inode\s+#?(\d+)', line) inode = inode_match.group(1) if inode_match else '' # Translate function name func_translations = { 'ext4_find_entry': 'directory lookup failed (possible directory corruption)', 'ext4_lookup': 'file lookup failed (possible metadata corruption)', 'ext4_journal_start': 'journal transaction failed (journal corruption)', 'ext4_readdir': 'directory read failed (directory data corrupted)', 'ext4_get_inode_loc': 'inode location failed (inode table corruption)', '__ext4_get_inode_loc': 'inode location failed (inode table corruption)', 'ext4_xattr_get': 'extended attributes read failed', 'ext4_iget': 'inode read failed (possible inode corruption)', 'ext4_mb_generate_buddy': 'block allocator error', 'ext4_validate_block_bitmap': 'block bitmap corrupted', 'ext4_validate_inode_bitmap': 'inode bitmap corrupted', 'htree_dirblock_to_tree': 'directory index tree corrupted', } # Identify the device device_info = self._identify_block_device(device) reason = f'{fs_type} filesystem error on /dev/{device}' if device_info: reason += f'\nDevice: {device_info}' else: reason += f'\nDevice: /dev/{device} (not currently detected -- may be a disconnected USB or temporary device)' if func_name: desc = func_translations.get(func_name, func_name) reason += f'\nError: {desc}' if inode: inode_hint = 'root directory' if inode == '2' else f'inode #{inode}' reason += f'\nAffected: {inode_hint}' reason += f'\nAction: Run "fsck /dev/{device}" (unmount first)' return reason # Out of memory if 'out of memory' in line_lower or 'oom_kill' in line_lower: m = re.search(r'Killed process\s+\d+\s+\(([^)]+)\)', line) process = m.group(1) if m else 'unknown' return f'Out of memory - system killed process "{process}" to free RAM' # Kernel panic if 'kernel panic' in line_lower: return 'Kernel panic - system halted. Reboot required.' # Segfault if 'segfault' in line_lower: m = re.search(r'(\S+)\[\d+\].*segfault', line) process = m.group(1) if m else 'unknown' is_critical_proc = any(p in process.lower() for p in self.PVE_CRITICAL_PROCESSES) if is_critical_proc: return f'Critical process "{process}" crashed (segmentation fault) -- PVE service affected' return f'Process "{process}" crashed (segmentation fault)' # Hardware error if 'hardware error' in line_lower or 'mce:' in line_lower: return f'Hardware error detected (MCE) - check CPU/RAM health' # RAID failure if 'raid' in line_lower and 'fail' in line_lower: md_match = re.search(r'(md\d+)', line) md_dev = md_match.group(1) if md_match else 'unknown' return f'RAID array {md_dev} degraded or failed - check disk status' # Fallback: clean up the raw line clean = re.sub(r'^\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2}\s+\S+\s+', '', line) clean = re.sub(r'\[\d+\]:\s*', '', clean) return clean[:150] def _classify_log_severity(self, line: str) -> Optional[str]: """ Classify log line severity intelligently. Returns: 'CRITICAL', 'WARNING', or None (benign/info) Design principles: - CRITICAL must be reserved for events that require IMMEDIATE action (data loss risk, service outage, hardware failure confirmed by SMART). - WARNING is for events worth investigating but not urgent. - Everything else is None (benign/informational). """ line_lower = line.lower() # Check if benign first -- fast path for known noise if self._is_benign_error(line): return None # Check critical keywords (hard failures: OOM, panic, FS corruption, etc.) for keyword in self.CRITICAL_LOG_KEYWORDS: if re.search(keyword, line_lower): return 'CRITICAL' # Check warning keywords (includes segfault, I/O errors, etc.) for keyword in self.WARNING_LOG_KEYWORDS: if re.search(keyword, line_lower): # Special case: segfault of a PVE-critical process is CRITICAL if 'segfault' in line_lower: for proc in self.PVE_CRITICAL_PROCESSES: if proc in line_lower: return 'CRITICAL' return 'WARNING' # Generic classification -- very conservative to avoid false positives. # Only escalate if the line explicitly uses severity-level keywords # from the kernel or systemd (not just any line containing "error"). if 'kernel panic' in line_lower or 'fatal' in line_lower and 'non-fatal' not in line_lower: return 'CRITICAL' # Lines from priority "err" that don't match any keyword above are # likely informational noise (e.g. "error response from daemon"). # Return None to avoid flooding the dashboard with non-actionable items. return None def _check_logs_with_persistence(self) -> Dict[str, Any]: """ Intelligent log checking with cascade detection and persistence. Focuses on detecting significant error patterns rather than transient warnings. New thresholds: - CASCADE: ≥15 errors (increased from 10) - SPIKE: ≥5 errors AND 4x increase (more restrictive) - PERSISTENT: Same error in 3 consecutive checks """ cache_key = 'logs_analysis' current_time = time.time() # Cache the result for 5 minutes to avoid excessive journalctl calls if cache_key in self.last_check_times: if current_time - self.last_check_times[cache_key] < self.LOG_CHECK_INTERVAL: # Return the full cached result (which includes 'checks' dict) cached = self.cached_results.get(cache_key) if cached: return cached return {'status': 'OK', 'checks': { 'log_error_cascade': {'status': 'OK', 'detail': 'No cascading errors'}, 'log_error_spike': {'status': 'OK', 'detail': 'No error spikes'}, 'log_persistent_errors': {'status': 'OK', 'detail': 'No persistent patterns'}, 'log_critical_errors': {'status': 'OK', 'detail': 'No critical errors'} }} try: # Fetch logs from the last 3 minutes for immediate issue detection result_recent = subprocess.run( ['journalctl', '--since', '3 minutes ago', '--no-pager', '-p', 'warning'], capture_output=True, text=True, timeout=3 ) # Fetch logs from the previous 3-minute interval to detect spikes/cascades result_previous = subprocess.run( ['journalctl', '--since', '6 minutes ago', '--until', '3 minutes ago', '--no-pager', '-p', 'warning'], capture_output=True, text=True, timeout=3 ) if result_recent.returncode == 0: recent_lines = result_recent.stdout.strip().split('\n') previous_lines = result_previous.stdout.strip().split('\n') if result_previous.returncode == 0 else [] recent_patterns = defaultdict(int) previous_patterns = defaultdict(int) critical_errors_found = {} # To store unique critical error lines for persistence for line in recent_lines: if not line.strip(): continue # Skip benign errors if self._is_benign_error(line): continue # Classify severity severity = self._classify_log_severity(line) if severity is None: # Skip informational or classified benign lines continue # Normalize to a pattern for grouping pattern = self._normalize_log_pattern(line) if severity == 'CRITICAL': pattern_hash = hashlib.md5(pattern.encode()).hexdigest()[:8] error_key = f'log_critical_{pattern_hash}' # ── SMART cross-reference for disk/FS errors ── # Filesystem and disk errors are only truly CRITICAL if # the underlying disk is actually failing. We check: # 1. Device exists? No -> WARNING (disconnected USB, etc.) # 2. SMART PASSED? -> WARNING (transient error, not disk failure) # 3. SMART FAILED? -> CRITICAL (confirmed hardware problem) # 4. SMART UNKNOWN? -> WARNING (can't confirm, err on side of caution) fs_dev_match = re.search( r'(?:ext4-fs|btrfs|xfs|zfs)\s+error.*?device\s+(\S+?)\)?[:\s]', line, re.IGNORECASE ) smart_status_for_log = None if fs_dev_match: fs_dev = fs_dev_match.group(1).rstrip(')') base_dev = re.sub(r'\d+$', '', fs_dev) if not os.path.exists(f'/dev/{base_dev}'): # Device not present -- almost certainly a disconnected drive severity = 'WARNING' smart_status_for_log = 'DEVICE_ABSENT' elif self.capabilities.get('has_smart'): smart_health = self._quick_smart_health(base_dev) smart_status_for_log = smart_health if smart_health == 'PASSED': # SMART says disk is healthy -- transient FS error severity = 'WARNING' elif smart_health == 'UNKNOWN': # Can't verify -- be conservative, don't alarm severity = 'WARNING' # smart_health == 'FAILED' -> keep CRITICAL if pattern not in critical_errors_found: # Only count as "critical" if severity wasn't downgraded if severity == 'CRITICAL': critical_errors_found[pattern] = line # Build a human-readable reason from the raw log line enriched_reason = self._enrich_critical_log_reason(line) # Append SMART context to the reason if we checked it if smart_status_for_log == 'PASSED': enriched_reason += '\nSMART: Passed (disk is healthy -- error is likely transient)' elif smart_status_for_log == 'FAILED': enriched_reason += '\nSMART: FAILED -- disk is failing, replace immediately' elif smart_status_for_log == 'DEVICE_ABSENT': enriched_reason += '\nDevice not currently detected -- may be a disconnected USB or temporary device' # Record persistent error if it's not already active if not health_persistence.is_error_active(error_key, category='logs'): health_persistence.record_error( error_key=error_key, category='logs', severity=severity, reason=enriched_reason, details={'pattern': pattern, 'raw_line': line[:200], 'smart_status': smart_status_for_log, 'dismissable': True} ) # Cross-reference: filesystem errors also belong in the disks category # so they appear in the Storage/Disks dashboard section fs_match = re.search(r'(?:ext4-fs|btrfs|xfs|zfs)\s+error.*?(?:device\s+(\S+?)\)?[:\s])', line, re.IGNORECASE) if fs_match: fs_device = fs_match.group(1).rstrip(')') if fs_match.group(1) else 'unknown' # Strip partition number to get base disk (sdb1 -> sdb) base_device = re.sub(r'\d+$', '', fs_device) if not ('nvme' in fs_device or 'mmcblk' in fs_device) else fs_device.rsplit('p', 1)[0] if 'p' in fs_device else fs_device disk_error_key = f'disk_fs_{fs_device}' # Use the SMART-aware severity we already determined above device_exists = os.path.exists(f'/dev/{base_device}') if not device_exists: fs_severity = 'WARNING' elif smart_status_for_log == 'PASSED': fs_severity = 'WARNING' # SMART healthy -> transient elif smart_status_for_log == 'FAILED': fs_severity = 'CRITICAL' # SMART failing -> real problem else: fs_severity = 'WARNING' # Can't confirm -> conservative if not health_persistence.is_error_active(disk_error_key, category='disks'): health_persistence.record_error( error_key=disk_error_key, category='disks', severity=fs_severity, reason=enriched_reason, details={ 'disk': base_device, 'device': f'/dev/{fs_device}', 'error_type': 'filesystem', 'error_count': 1, 'sample': line[:200], 'smart_status': smart_status_for_log, 'dismissable': True, 'device_exists': device_exists, } ) recent_patterns[pattern] += 1 if pattern in self.persistent_log_patterns: self.persistent_log_patterns[pattern]['count'] += 1 self.persistent_log_patterns[pattern]['last_seen'] = current_time else: self.persistent_log_patterns[pattern] = { 'count': 1, 'first_seen': current_time, 'last_seen': current_time, 'sample': line.strip()[:200], # Original line for display } for line in previous_lines: if not line.strip(): continue # Skip benign errors if self._is_benign_error(line): continue # Classify severity severity = self._classify_log_severity(line) if severity is None: # Skip informational or classified benign lines continue # Normalize to a pattern for grouping pattern = self._normalize_log_pattern(line) previous_patterns[pattern] += 1 cascading_errors = { pattern: count for pattern, count in recent_patterns.items() if count >= 15 and self._classify_log_severity(pattern) in ['WARNING', 'CRITICAL'] } spike_errors = {} for pattern, recent_count in recent_patterns.items(): prev_count = previous_patterns.get(pattern, 0) if recent_count >= 5 and recent_count >= prev_count * 4: spike_errors[pattern] = recent_count # Helper: get human-readable samples from normalized patterns def _get_samples(error_dict, max_items=3): """Return list of readable sample lines for error patterns.""" samples = [] for pattern in list(error_dict.keys())[:max_items]: pdata = self.persistent_log_patterns.get(pattern, {}) sample = pdata.get('sample', pattern) # Trim timestamp prefix if present (e.g. "Feb 27 16:03:35 host ") clean = re.sub(r'^[A-Z][a-z]{2}\s+\d+\s+[\d:]+\s+\S+\s+', '', sample) samples.append(clean[:120]) return samples persistent_errors = {} for pattern, data in self.persistent_log_patterns.items(): time_span = current_time - data['first_seen'] if data['count'] >= 3 and time_span >= 900: # 15 minutes persistent_errors[pattern] = data['count'] # Record as warning if not already recorded pattern_hash = hashlib.md5(pattern.encode()).hexdigest()[:8] error_key = f'log_persistent_{pattern_hash}' if not health_persistence.is_error_active(error_key, category='logs'): # Use the original sample line for the notification, # not the normalized pattern (which has IDs replaced). sample = data.get('sample', pattern) # Strip journal timestamp prefix so the stored reason # doesn't contain dated information that confuses # re-notifications. clean_sample = re.sub( r'^[A-Z][a-z]{2}\s+\d+\s+[\d:]+\s+\S+\s+', '', sample ) health_persistence.record_error( error_key=error_key, category='logs', severity='WARNING', reason=f'Recurring error ({data["count"]}x): {clean_sample[:150]}', details={'pattern': pattern, 'sample': sample, 'dismissable': True, 'occurrences': data['count']} ) patterns_to_remove = [ p for p, data in self.persistent_log_patterns.items() if current_time - data['last_seen'] > 1800 ] for pattern in patterns_to_remove: del self.persistent_log_patterns[pattern] unique_critical_count = len(critical_errors_found) cascade_count = len(cascading_errors) spike_count = len(spike_errors) persistent_count = len(persistent_errors) if unique_critical_count > 0: status = 'CRITICAL' # Use enriched reason from the first critical error for the summary representative_line = next(iter(critical_errors_found.values())) enriched = self._enrich_critical_log_reason(representative_line) if unique_critical_count == 1: reason = enriched else: reason = f'{unique_critical_count} critical error(s):\n{enriched}' elif cascade_count > 0: status = 'WARNING' samples = _get_samples(cascading_errors, 3) reason = f'Error cascade ({cascade_count} patterns repeating):\n' + '\n'.join(f' - {s}' for s in samples) elif spike_count > 0: status = 'WARNING' samples = _get_samples(spike_errors, 3) reason = f'Error spike ({spike_count} patterns with 4x increase):\n' + '\n'.join(f' - {s}' for s in samples) elif persistent_count > 0: status = 'WARNING' samples = _get_samples(persistent_errors, 3) reason = f'Persistent errors ({persistent_count} patterns over 15+ min):\n' + '\n'.join(f' - {s}' for s in samples) else: # No significant issues found status = 'OK' reason = None # Record/clear persistent errors for each log sub-check so Dismiss works cascade_samples = _get_samples(cascading_errors, 2) if cascade_count else [] spike_samples = _get_samples(spike_errors, 2) if spike_count else [] persist_samples = _get_samples(persistent_errors, 2) if persistent_count else [] log_sub_checks = { 'log_error_cascade': {'active': cascade_count > 0, 'severity': 'WARNING', 'reason': f'{cascade_count} pattern(s) repeating >=15 times:\n' + '\n'.join(f' - {s}' for s in cascade_samples) if cascade_count else ''}, 'log_error_spike': {'active': spike_count > 0, 'severity': 'WARNING', 'reason': f'{spike_count} pattern(s) with 4x increase:\n' + '\n'.join(f' - {s}' for s in spike_samples) if spike_count else ''}, 'log_persistent_errors': {'active': persistent_count > 0, 'severity': 'WARNING', 'reason': f'{persistent_count} recurring pattern(s) over 15+ min:\n' + '\n'.join(f' - {s}' for s in persist_samples) if persistent_count else ''}, 'log_critical_errors': {'active': unique_critical_count > 0, 'severity': 'CRITICAL', 'reason': f'{unique_critical_count} critical error(s) found', 'dismissable': False}, } # Track which sub-checks were dismissed dismissed_keys = set() for err_key, info in log_sub_checks.items(): if info['active']: is_dismissable = info.get('dismissable', True) result = health_persistence.record_error( error_key=err_key, category='logs', severity=info['severity'], reason=info['reason'], details={'dismissable': is_dismissable} ) if result and result.get('type') == 'skipped_acknowledged': dismissed_keys.add(err_key) elif health_persistence.is_error_active(err_key): health_persistence.clear_error(err_key) # Build checks dict - downgrade dismissed items to INFO def _log_check_status(key, active, severity): if not active: return 'OK' if key in dismissed_keys: return 'INFO' return severity log_checks = { 'log_error_cascade': { 'status': _log_check_status('log_error_cascade', cascade_count > 0, 'WARNING'), 'detail': f'{cascade_count} pattern(s) repeating >=15 times' if cascade_count > 0 else 'No cascading errors', 'dismissable': True, 'dismissed': 'log_error_cascade' in dismissed_keys, 'error_key': 'log_error_cascade' }, 'log_error_spike': { 'status': _log_check_status('log_error_spike', spike_count > 0, 'WARNING'), 'detail': f'{spike_count} pattern(s) with 4x increase' if spike_count > 0 else 'No error spikes', 'dismissable': True, 'dismissed': 'log_error_spike' in dismissed_keys, 'error_key': 'log_error_spike' }, 'log_persistent_errors': { 'status': _log_check_status('log_persistent_errors', persistent_count > 0, 'WARNING'), 'detail': f'{persistent_count} recurring pattern(s) over 15+ min' if persistent_count > 0 else 'No persistent patterns', 'dismissable': True, 'dismissed': 'log_persistent_errors' in dismissed_keys, 'error_key': 'log_persistent_errors' }, 'log_critical_errors': { 'status': _log_check_status('log_critical_errors', unique_critical_count > 0, 'CRITICAL'), 'detail': reason if unique_critical_count > 0 else 'No critical errors', 'dismissable': False, 'error_key': 'log_critical_errors' } } # Recalculate overall status considering dismissed items active_issues = {k: v for k, v in log_checks.items() if v['status'] in ('WARNING', 'CRITICAL')} if not active_issues: status = 'OK' reason = None else: # Recalculate status and reason from only non-dismissed sub-checks has_critical = any(v['status'] == 'CRITICAL' for v in active_issues.values()) status = 'CRITICAL' if has_critical else 'WARNING' # Rebuild reason from active (non-dismissed) checks only active_reasons = [] for k, v in active_issues.items(): detail = v.get('detail', '') if detail: active_reasons.append(detail) reason = '; '.join(active_reasons[:3]) if active_reasons else None log_result = {'status': status, 'checks': log_checks} if reason: log_result['reason'] = reason self.cached_results[cache_key] = log_result self.last_check_times[cache_key] = current_time return log_result # If journalctl command failed or returned no data ok_result = {'status': 'OK', 'checks': { 'log_error_cascade': {'status': 'OK', 'detail': 'No cascading errors'}, 'log_error_spike': {'status': 'OK', 'detail': 'No error spikes'}, 'log_persistent_errors': {'status': 'OK', 'detail': 'No persistent patterns'}, 'log_critical_errors': {'status': 'OK', 'detail': 'No critical errors'} }} self.cached_results[cache_key] = ok_result self.last_check_times[cache_key] = current_time return ok_result except Exception as e: print(f"[HealthMonitor] Log check failed: {e}") return {'status': 'UNKNOWN', 'reason': f'Log check unavailable: {str(e)}', 'checks': {}} def _normalize_log_pattern(self, line: str) -> str: """ Normalize log line to a pattern for grouping similar errors. Removes timestamps, PIDs, IDs, paths, and other variables. """ # Remove standard syslog timestamp and process info if present pattern = re.sub(r'^\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2}\s+\S+(\s+\[\d+\])?:\s+', '', line) pattern = re.sub(r'\d{4}-\d{2}-\d{2}', '', pattern) # Remove dates pattern = re.sub(r'\d{2}:\d{2}:\d{2}', '', pattern) # Remove times pattern = re.sub(r'pid[:\s]+\d+', 'pid:XXX', pattern.lower()) # Normalize PIDs pattern = re.sub(r'\b\d{3,6}\b', 'ID', pattern) # Normalize IDs (common for container/VM IDs) pattern = re.sub(r'/dev/\S+', '/dev/XXX', pattern) # Normalize device paths pattern = re.sub(r'/\S+/\S+', '/PATH/', pattern) # Normalize general paths pattern = re.sub(r'0x[0-9a-f]+', '0xXXX', pattern) # Normalize hex values pattern = re.sub(r'\b(uuid|guid|hash)[:=]\s*[\w-]+\b', r'\1=XXX', pattern.lower()) # Normalize UUIDs/GUIDs pattern = re.sub(r'\s+', ' ', pattern).strip() # Normalize whitespace return pattern[:150] # Keep first 150 characters to avoid overly long patterns # Regex to parse Inst lines: Inst [] ( []) _RE_INST = re.compile(r'^Inst\s+(\S+)\s+\[([^\]]+)\]\s+\((\S+)\s+') _RE_INST_NEW = re.compile(r'^Inst\s+(\S+)\s+\((\S+)\s+') _PVE_PREFIXES = ( 'pve-', 'proxmox-', 'qemu-server', 'lxc-pve', 'ceph', 'corosync', 'libpve', 'pbs-', 'pmg-', ) _KERNEL_PREFIXES = ('linux-image', 'pve-kernel', 'pve-firmware') _IMPORTANT_PKGS = { 'pve-manager', 'proxmox-ve', 'qemu-server', 'pve-container', 'pve-ha-manager', 'pve-firewall', 'ceph-common', 'proxmox-backup-client', } def _check_updates(self) -> Optional[Dict[str, Any]]: """ Check for pending system updates. - INFO: Any updates available (including security updates). - WARNING: Security updates pending 360+ days unpatched, or system not updated >1 year (365 days). - CRITICAL: System not updated >18 months (548 days). Updates are always informational unless they represent a prolonged unpatched state. Detects PVE version upgrades from pve-manager Inst lines and exposes them as an INFO sub-check. """ cache_key = 'updates_check' current_time = time.time() # Cache for 10 minutes if cache_key in self.last_check_times: if current_time - self.last_check_times[cache_key] < 600: return self.cached_results.get(cache_key) try: apt_history_path = '/var/log/apt/history.log' last_update_days = None sec_result = None age_result = None if os.path.exists(apt_history_path): try: mtime = os.path.getmtime(apt_history_path) days_since_update = (current_time - mtime) / 86400 last_update_days = int(days_since_update) except Exception: pass # Perform a dry run of apt-get upgrade to see pending packages try: result = subprocess.run( ['apt-get', 'upgrade', '--dry-run'], capture_output=True, text=True, timeout=30 ) except subprocess.TimeoutExpired: print("[HealthMonitor] apt-get upgrade --dry-run timed out") return { 'status': 'UNKNOWN', 'reason': 'apt-get timed out - repository may be unreachable', 'count': 0, 'checks': {} } status = 'OK' reason = None update_count = 0 security_pkgs: list = [] kernel_pkgs: list = [] pve_pkgs: list = [] important_pkgs: list = [] # {name, cur, new} pve_manager_info = None # {cur, new} or None sec_result = None sec_severity = 'INFO' sec_days_unpatched = 0 if result.returncode == 0: for line in result.stdout.strip().split('\n'): if not line.startswith('Inst '): continue update_count += 1 # Parse package name, current and new versions m = self._RE_INST.match(line) if m: pkg_name, cur_ver, new_ver = m.group(1), m.group(2), m.group(3) else: m2 = self._RE_INST_NEW.match(line) if m2: pkg_name, cur_ver, new_ver = m2.group(1), '', m2.group(2) else: parts = line.split() pkg_name = parts[1] if len(parts) > 1 else 'unknown' cur_ver, new_ver = '', '' # Strip arch suffix (e.g. package:amd64) pkg_name = pkg_name.split(':')[0] name_lower = pkg_name.lower() line_lower = line.lower() # Categorise if 'security' in line_lower or 'debian-security' in line_lower: security_pkgs.append(pkg_name) if any(name_lower.startswith(p) for p in self._KERNEL_PREFIXES): kernel_pkgs.append(pkg_name) elif any(name_lower.startswith(p) for p in self._PVE_PREFIXES): pve_pkgs.append(pkg_name) # Collect important packages with version info if pkg_name in self._IMPORTANT_PKGS and cur_ver: important_pkgs.append({ 'name': pkg_name, 'cur': cur_ver, 'new': new_ver }) # Detect pve-manager upgrade -> PVE version upgrade if pkg_name == 'pve-manager' and cur_ver and new_ver: pve_manager_info = {'cur': cur_ver, 'new': new_ver} # ── Determine overall status ────────────────────── if security_pkgs: sec_days_unpatched = 0 try: existing = health_persistence.get_error_by_key('security_updates') if existing and existing.get('first_seen'): from datetime import datetime first_dt = datetime.fromisoformat(existing['first_seen']) sec_days_unpatched = (datetime.now() - first_dt).days except Exception: pass if sec_days_unpatched >= self.SECURITY_WARN_DAYS: status = 'WARNING' reason = f'{len(security_pkgs)} security update(s) pending for {sec_days_unpatched} days' sec_severity = 'WARNING' else: status = 'INFO' reason = f'{len(security_pkgs)} security update(s) pending' sec_severity = 'INFO' sec_result = health_persistence.record_error( error_key='security_updates', category='updates', severity=sec_severity, reason=reason, details={'count': len(security_pkgs), 'packages': security_pkgs[:5], 'dismissable': sec_severity == 'WARNING', 'days_unpatched': sec_days_unpatched} ) if sec_result and sec_result.get('type') == 'skipped_acknowledged': status = 'INFO' reason = None elif last_update_days and last_update_days >= 548: status = 'CRITICAL' reason = f'System not updated in {last_update_days} days (>18 months)' health_persistence.record_error( error_key='system_age', category='updates', severity='CRITICAL', reason=reason, details={'days': last_update_days, 'update_count': update_count, 'dismissable': False} ) elif last_update_days and last_update_days >= 365: status = 'WARNING' reason = f'System not updated in {last_update_days} days (>1 year)' age_result = health_persistence.record_error( error_key='system_age', category='updates', severity='WARNING', reason=reason, details={'days': last_update_days, 'update_count': update_count, 'dismissable': True} ) if age_result and age_result.get('type') == 'skipped_acknowledged': status = 'INFO' reason = None elif kernel_pkgs or pve_pkgs: status = 'INFO' reason = f'{len(kernel_pkgs)} kernel + {len(pve_pkgs)} Proxmox update(s) available' elif update_count > 0: status = 'INFO' reason = f'{update_count} package update(s) pending' elif result.returncode != 0: status = 'WARNING' reason = 'Failed to check for updates (apt-get error)' # ── Build checks dict ───────────────────────────────── age_dismissed = bool(age_result and age_result.get('type') == 'skipped_acknowledged') update_age_status = 'CRITICAL' if (last_update_days and last_update_days >= 548) else ( 'INFO' if age_dismissed else ('WARNING' if (last_update_days and last_update_days >= 365) else 'OK')) sec_dismissed = security_pkgs and sec_result and sec_result.get('type') == 'skipped_acknowledged' if sec_dismissed: sec_status = 'INFO' elif security_pkgs: sec_status = sec_severity else: sec_status = 'OK' sec_detail = f'{len(security_pkgs)} security update(s) pending' if security_pkgs and sec_days_unpatched >= self.SECURITY_WARN_DAYS: sec_detail += f' ({sec_days_unpatched} days unpatched)' checks = { 'kernel_pve': { 'status': 'INFO' if kernel_pkgs else 'OK', 'detail': f'{len(kernel_pkgs)} kernel/PVE update(s)' if kernel_pkgs else 'Kernel/PVE up to date', 'error_key': 'kernel_pve' }, 'pending_updates': { 'status': 'INFO' if update_count > 0 else 'OK', 'detail': f'{update_count} package(s) pending', 'error_key': 'pending_updates' }, 'security_updates': { 'status': sec_status, 'detail': sec_detail if security_pkgs else 'No security updates pending', 'dismissable': sec_status == 'WARNING' and not sec_dismissed, 'dismissed': bool(sec_dismissed), 'error_key': 'security_updates' }, 'system_age': { 'status': update_age_status, 'detail': f'Last updated {last_update_days} day(s) ago' if last_update_days is not None else 'Unknown', 'dismissable': update_age_status == 'WARNING' and not age_dismissed, 'dismissed': bool(age_dismissed), 'error_key': 'system_age' }, } # PVE version sub-check (always INFO) if pve_manager_info: checks['pve_version'] = { 'status': 'INFO', 'detail': f"PVE {pve_manager_info['cur']} -> {pve_manager_info['new']} available", 'error_key': 'pve_version' } else: checks['pve_version'] = { 'status': 'OK', 'detail': 'Proxmox VE is up to date', 'error_key': 'pve_version' } # Construct result dictionary update_result = { 'status': status, 'count': update_count, 'checks': checks, } if reason: update_result['reason'] = reason if last_update_days is not None: update_result['days_since_update'] = last_update_days # Attach categorised counts for the frontend update_result['security_count'] = len(security_pkgs) update_result['pve_count'] = len(pve_pkgs) update_result['kernel_count'] = len(kernel_pkgs) update_result['important_packages'] = important_pkgs[:8] self.cached_results[cache_key] = update_result self.last_check_times[cache_key] = current_time return update_result except Exception as e: print(f"[HealthMonitor] Updates check failed: {e}") return {'status': 'UNKNOWN', 'reason': f'Updates check unavailable: {str(e)}', 'count': 0, 'checks': {}} def _check_fail2ban_bans(self) -> Dict[str, Any]: """ Check if fail2ban is installed and if there are currently banned IPs. Cached for 60 seconds to avoid hammering fail2ban-client. Returns: {'installed': bool, 'active': bool, 'status': str, 'detail': str, 'banned_count': int, 'jails': [...], 'banned_ips': [...]} """ cache_key = 'fail2ban_bans' current_time = time.time() if cache_key in self.last_check_times: if current_time - self.last_check_times[cache_key] < 60: return self.cached_results.get(cache_key, {'installed': False, 'status': 'OK', 'detail': 'Not installed'}) result = {'installed': False, 'active': False, 'status': 'OK', 'detail': 'Not installed', 'banned_count': 0, 'jails': [], 'banned_ips': []} try: # Check if fail2ban-client exists which_result = subprocess.run( ['which', 'fail2ban-client'], capture_output=True, text=True, timeout=2 ) if which_result.returncode != 0: self.cached_results[cache_key] = result self.last_check_times[cache_key] = current_time return result result['installed'] = True # Check if fail2ban service is active active_check = subprocess.run( ['systemctl', 'is-active', 'fail2ban'], capture_output=True, text=True, timeout=2 ) if active_check.stdout.strip() != 'active': result['detail'] = 'Fail2Ban installed but service not active' self.cached_results[cache_key] = result self.last_check_times[cache_key] = current_time return result result['active'] = True # Get list of active jails jails_result = subprocess.run( ['fail2ban-client', 'status'], capture_output=True, text=True, timeout=3 ) jails = [] if jails_result.returncode == 0: for line in jails_result.stdout.split('\n'): if 'Jail list:' in line: jail_str = line.split('Jail list:')[1].strip() jails = [j.strip() for j in jail_str.split(',') if j.strip()] break if not jails: result['detail'] = 'Fail2Ban active, no jails configured' self.cached_results[cache_key] = result self.last_check_times[cache_key] = current_time return result result['jails'] = jails # Check each jail for banned IPs total_banned = 0 all_banned_ips = [] jails_with_bans = [] for jail in jails: try: jail_result = subprocess.run( ['fail2ban-client', 'status', jail], capture_output=True, text=True, timeout=2 ) if jail_result.returncode == 0: for line in jail_result.stdout.split('\n'): if 'Currently banned:' in line: try: count = int(line.split('Currently banned:')[1].strip()) if count > 0: total_banned += count jails_with_bans.append(jail) except (ValueError, IndexError): pass elif 'Banned IP list:' in line: ips_str = line.split('Banned IP list:')[1].strip() if ips_str: ips = [ip.strip() for ip in ips_str.split() if ip.strip()] all_banned_ips.extend(ips[:10]) # Limit to 10 IPs per jail except Exception: pass result['banned_count'] = total_banned result['banned_ips'] = all_banned_ips[:20] # Max 20 total if total_banned > 0: jails_str = ', '.join(jails_with_bans) msg = f'{total_banned} IP(s) currently banned by Fail2Ban (jails: {jails_str})' result['status'] = 'WARNING' result['detail'] = msg # Persistence handled by _check_security caller via security_fail2ban key else: result['detail'] = f'Fail2Ban active ({len(jails)} jail(s), no current bans)' # Auto-resolve if previously banned IPs are now gone if health_persistence.is_error_active('fail2ban'): health_persistence.clear_error('fail2ban') except Exception as e: result['detail'] = f'Unable to check Fail2Ban: {str(e)[:50]}' self.cached_results[cache_key] = result self.last_check_times[cache_key] = current_time return result def _check_security(self) -> Dict[str, Any]: """ Check security-related items with detailed sub-item breakdown: - Uptime check: >1 year without kernel update indicates vulnerability - SSL certificates: PVE certificate expiration - Login attempts: Excessive failed logins (brute force detection) - Fail2Ban: Currently banned IPs (if fail2ban is installed) Returns a result with 'checks' dict containing per-item status. """ try: issues = [] checks = { 'uptime': {'status': 'OK', 'detail': ''}, 'certificates': {'status': 'OK', 'detail': ''}, 'login_attempts': {'status': 'OK', 'detail': ''}, } # Sub-check 1: Uptime for potential kernel vulnerabilities try: uptime_seconds = time.time() - psutil.boot_time() uptime_days = uptime_seconds / 86400 if uptime_days > 365: updates_data = self.cached_results.get('updates_check') if updates_data and updates_data.get('days_since_update', 9999) > 365: msg = f'Uptime {int(uptime_days)} days (>1 year, consider updating kernel/system)' issues.append(msg) checks['uptime'] = {'status': 'WARNING', 'detail': msg, 'days': int(uptime_days), 'dismissable': True} else: checks['uptime'] = {'status': 'OK', 'detail': f'Uptime {int(uptime_days)} days, system recently updated'} else: checks['uptime'] = {'status': 'OK', 'detail': f'Uptime {int(uptime_days)} days'} except Exception: checks['uptime'] = {'status': 'OK', 'detail': 'Unable to determine uptime'} # Sub-check 2: SSL certificates cert_status = self._check_certificates() if cert_status: cert_sev = cert_status.get('status', 'OK') cert_reason = cert_status.get('reason', '') checks['certificates'] = { 'status': cert_sev, 'detail': cert_reason if cert_reason else 'Certificate valid', 'dismissable': True if cert_sev not in ['OK', 'INFO'] else False } if cert_sev not in ['OK', 'INFO']: issues.append(cert_reason or 'Certificate issue') # Sub-check 3: Failed login attempts (brute force detection) try: result = subprocess.run( ['journalctl', '--since', '24 hours ago', '--no-pager'], capture_output=True, text=True, timeout=3 ) failed_logins = 0 if result.returncode == 0: for line in result.stdout.split('\n'): line_lower = line.lower() if 'authentication failure' in line_lower or 'failed password' in line_lower or 'invalid user' in line_lower: failed_logins += 1 if failed_logins > 50: msg = f'{failed_logins} failed login attempts in 24h' issues.append(msg) checks['login_attempts'] = {'status': 'WARNING', 'detail': msg, 'count': failed_logins, 'dismissable': True} elif failed_logins > 0: checks['login_attempts'] = {'status': 'OK', 'detail': f'{failed_logins} failed attempts in 24h (within threshold)', 'count': failed_logins} else: checks['login_attempts'] = {'status': 'OK', 'detail': 'No failed login attempts in 24h', 'count': 0} except Exception: checks['login_attempts'] = {'status': 'OK', 'detail': 'Unable to check login attempts'} # Sub-check 4: Fail2Ban ban detection (only show if installed) try: f2b = self._check_fail2ban_bans() if f2b.get('installed', False): f2b_status = f2b.get('status', 'OK') checks['fail2ban'] = { 'status': f2b_status, 'dismissable': True if f2b_status not in ['OK'] else False, 'detail': f2b.get('detail', ''), 'installed': True, 'banned_count': f2b.get('banned_count', 0) } if f2b.get('status') == 'WARNING': issues.append(f2b.get('detail', 'Fail2Ban bans detected')) # If not installed, simply don't add it to checks except Exception: pass # Persist errors and respect dismiss for each sub-check dismissed_keys = set() security_sub_checks = { 'security_login_attempts': 'login_attempts', 'security_certificates': 'certificates', 'security_uptime': 'uptime', 'security_fail2ban': 'fail2ban', } # Inject error_key into each check so the frontend knows which DB key to use for err_key, check_name in security_sub_checks.items(): if check_name in checks: checks[check_name]['error_key'] = err_key for err_key, check_name in security_sub_checks.items(): check_info = checks.get(check_name, {}) check_status = check_info.get('status', 'OK') if check_status not in ('OK', 'INFO'): is_dismissable = check_info.get('dismissable', True) rec_result = health_persistence.record_error( error_key=err_key, category='security', severity=check_status, reason=check_info.get('detail', ''), details={'dismissable': is_dismissable} ) if rec_result and rec_result.get('type') == 'skipped_acknowledged': dismissed_keys.add(err_key) elif health_persistence.is_error_active(err_key): health_persistence.clear_error(err_key) # Rebuild issues excluding dismissed sub-checks key_to_check = { 'security_login_attempts': 'login_attempts', 'security_certificates': 'certificates', 'security_uptime': 'uptime', 'security_fail2ban': 'fail2ban', } active_issues = [] for err_key, check_name in key_to_check.items(): if err_key in dismissed_keys: # Mark as dismissed in checks for the frontend if check_name in checks: checks[check_name]['dismissed'] = True continue check_info = checks.get(check_name, {}) if check_info.get('status', 'OK') not in ('OK', 'INFO'): active_issues.append(check_info.get('detail', '')) # Determine overall security status from non-dismissed issues only if active_issues: has_critical = any( c.get('status') == 'CRITICAL' for k, c in checks.items() if f'security_{k}' not in dismissed_keys ) overall_status = 'CRITICAL' if has_critical else 'WARNING' return { 'status': overall_status, 'reason': '; '.join(active_issues[:2]), 'checks': checks } return { 'status': 'OK', 'checks': checks } except Exception as e: print(f"[HealthMonitor] Security check failed: {e}") return {'status': 'UNKNOWN', 'reason': f'Security check unavailable: {str(e)}', 'checks': {}} def _check_certificates(self) -> Optional[Dict[str, Any]]: """ Check SSL certificate expiration for PVE's default certificate. INFO: Self-signed or no cert configured (normal for internal servers) WARNING: Expires <30 days CRITICAL: Expired """ cache_key = 'certificates' current_time = time.time() # Cache for 1 day (86400 seconds) if cache_key in self.last_check_times: if current_time - self.last_check_times[cache_key] < 86400: return self.cached_results.get(cache_key) try: cert_path = '/etc/pve/local/pve-ssl.pem' if not os.path.exists(cert_path): cert_result = { 'status': 'INFO', 'reason': 'Self-signed or default PVE certificate' } self.cached_results[cache_key] = cert_result self.last_check_times[cache_key] = current_time return cert_result # Use openssl to get the expiry date result = subprocess.run( ['openssl', 'x509', '-enddate', '-noout', '-in', cert_path], capture_output=True, text=True, timeout=2 ) if result.returncode == 0: date_str = result.stdout.strip().replace('notAfter=', '') try: # Parse the date string (format can vary, e.g., 'Jun 15 10:00:00 2024 GMT') # Attempt common formats exp_date = None try: # Try more detailed format first exp_date = datetime.strptime(date_str, '%b %d %H:%M:%S %Y %Z') except ValueError: # Fallback to simpler format if needed try: exp_date = datetime.strptime(date_str, '%b %d %H:%M:%S %Y') except ValueError: # Fallback for "notAfter=..." string itself being the issue if 'notAfter=' in date_str: # If it's the raw string itself pass # Will result in 'INFO' status if exp_date: days_until_expiry = (exp_date - datetime.now()).days if days_until_expiry < 0: status = 'CRITICAL' reason = 'Certificate expired' elif days_until_expiry < 30: status = 'WARNING' reason = f'Certificate expires in {days_until_expiry} days' else: status = 'OK' reason = None cert_result = {'status': status} if reason: cert_result['reason'] = reason self.cached_results[cache_key] = cert_result self.last_check_times[cache_key] = current_time return cert_result except Exception as e: print(f"[HealthMonitor] Error parsing certificate expiry date '{date_str}': {e}") # Fall through to return INFO if parsing fails # If openssl command failed or date parsing failed return {'status': 'INFO', 'reason': 'Certificate check inconclusive'} except Exception as e: print(f"[HealthMonitor] Error checking certificates: {e}") return {'status': 'OK'} # Return OK on exception def _check_disk_health_from_events(self) -> Dict[str, Any]: """ Check for disk health warnings/errors from system logs (journalctl). Looks for SMART warnings and specific disk errors. Returns dict of disk issues found. """ disk_issues = {} try: # Check journalctl for warnings/errors related to disks in the last hour result = subprocess.run( ['journalctl', '--since', '1 hour ago', '--no-pager', '-p', 'warning'], capture_output=True, text=True, timeout=3 ) if result.returncode == 0: for line in result.stdout.split('\n'): line_lower = line.lower() # Check for SMART warnings/errors if 'smart' in line_lower and ('warning' in line_lower or 'error' in line_lower or 'fail' in line_lower): # Extract disk name using regex for common disk identifiers disk_match = re.search(r'/dev/(sd[a-z]|nvme\d+n\d+|hd\d+)', line) if disk_match: disk_name = disk_match.group(1) # Prioritize CRITICAL if already warned, otherwise set to WARNING if disk_name not in disk_issues or disk_issues[f'/dev/{disk_name}']['status'] != 'CRITICAL': disk_issues[f'/dev/{disk_name}'] = { 'status': 'WARNING', 'reason': 'SMART warning detected' } # Check for specific disk I/O or medium errors if any(keyword in line_lower for keyword in ['disk error', 'ata error', 'medium error', 'io error']): disk_match = re.search(r'/dev/(sd[a-z]|nvme\d+n\d+|hd\d+)', line) if disk_match: disk_name = disk_match.group(1) disk_issues[f'/dev/{disk_name}'] = { 'status': 'CRITICAL', 'reason': 'Disk error detected' } except Exception as e: print(f"[HealthMonitor] Error checking disk health from events: {e}") # Return empty dict on error, as this check isn't system-critical itself pass return disk_issues def _check_zfs_pool_health(self) -> Dict[str, Any]: """ Check ZFS pool health status using 'zpool status' command. Returns dict of pools with non-ONLINE status (DEGRADED, FAULTED, UNAVAIL, etc.). """ zfs_issues = {} try: # First check if 'zpool' command exists to avoid errors on non-ZFS systems result_which = subprocess.run( ['which', 'zpool'], capture_output=True, text=True, timeout=1 ) if result_which.returncode != 0: # ZFS is not installed or 'zpool' command not in PATH, so no ZFS issues to report. return zfs_issues # Get list of all pools and their health status result = subprocess.run( ['zpool', 'list', '-H', '-o', 'name,health'], # -H for no header capture_output=True, text=True, timeout=5 ) if result.returncode == 0: lines = result.stdout.strip().split('\n') for line in lines: if not line.strip(): continue parts = line.split() if len(parts) >= 2: pool_name = parts[0] pool_health = parts[1].upper() # Ensure uppercase for consistent comparison # 'ONLINE' is the healthy state. Any other status indicates a problem. if pool_health != 'ONLINE': if pool_health in ['DEGRADED', 'FAULTED', 'UNAVAIL', 'REMOVED']: # These are critical states status = 'CRITICAL' reason = f'ZFS pool {pool_health.lower()}' else: # Any other non-ONLINE state is at least a warning status = 'WARNING' reason = f'ZFS pool status: {pool_health.lower()}' # Use a unique key for each pool issue zfs_issues[f'zpool_{pool_name}'] = { 'status': status, 'reason': reason, 'pool_name': pool_name, 'health': pool_health } except Exception as e: print(f"[HealthMonitor] Error checking ZFS pool health: {e}") # If 'zpool status' command itself fails, we can't report ZFS issues. # Return empty dict as no specific ZFS issues were detected by this check. pass return zfs_issues def _check_proxmox_storage(self) -> Optional[Dict[str, Any]]: """ Check Proxmox storage status using the proxmox_storage_monitor module. Detects unavailable storages configured in PVE. Returns CRITICAL if any configured storage is unavailable. Returns None if the module is not available. """ if not PROXMOX_STORAGE_AVAILABLE: return None try: # Reload configuration to ensure we have the latest storage definitions proxmox_storage_monitor.reload_configuration() # Get the current status of all configured storages storage_status = proxmox_storage_monitor.get_storage_status() unavailable_storages = storage_status.get('unavailable', []) if not unavailable_storages: # All storages are available. We should also clear any previously recorded storage errors. active_errors = health_persistence.get_active_errors() for error in active_errors: if error.get('category') == 'storage' and error.get('error_key', '').startswith('storage_unavailable_'): health_persistence.clear_error(error['error_key']) # Build checks from all configured storages for descriptive display available_storages = storage_status.get('available', []) checks = {} for st in available_storages: st_name = st.get('name', 'unknown') st_type = st.get('type', 'unknown') checks[st_name] = { 'status': 'OK', 'detail': f'{st_type} storage available' } if not checks: checks['proxmox_storages'] = {'status': 'OK', 'detail': 'All storages available'} return {'status': 'OK', 'checks': checks} storage_details = {} for storage in unavailable_storages: storage_name = storage['name'] error_key = f'storage_unavailable_{storage_name}' status_detail = storage.get('status_detail', 'unavailable') # Formulate a descriptive reason for the issue if status_detail == 'not_found': reason = f"Storage '{storage_name}' is configured but not found on the server." elif status_detail == 'unavailable': reason = f"Storage '{storage_name}' is not available (connection error or backend issue)." else: reason = f"Storage '{storage_name}' has status: {status_detail}." # Record a persistent CRITICAL error for each unavailable storage health_persistence.record_error( error_key=error_key, category='storage', severity='CRITICAL', reason=reason, details={ 'storage_name': storage_name, 'storage_type': storage.get('type', 'unknown'), 'status_detail': status_detail, 'dismissable': False } ) # Add to details dict with dismissable false for frontend storage_details[storage_name] = { 'reason': reason, 'type': storage.get('type', 'unknown'), 'status': status_detail, 'dismissable': False } # Build checks from storage_details checks = {} for st_name, st_info in storage_details.items(): checks[st_name] = { 'status': 'CRITICAL', 'detail': st_info.get('reason', 'Unavailable'), 'dismissable': False } # Also add available storages available_list = storage_status.get('available', []) unavail_names = {s['name'] for s in unavailable_storages} for st in available_list: if st.get('name') not in unavail_names and st.get('name') not in checks: checks[st['name']] = { 'status': 'OK', 'detail': f'{st.get("type", "unknown")} storage available' } return { 'status': 'CRITICAL', 'reason': f'{len(unavailable_storages)} Proxmox storage(s) unavailable', 'details': storage_details, 'checks': checks } except Exception as e: print(f"[HealthMonitor] Error checking Proxmox storage: {e}") # Return None on exception to indicate the check could not be performed, not necessarily a failure. return None def get_health_status(self) -> Dict[str, Any]: """ Main function to get the comprehensive health status. This function orchestrates all individual checks and aggregates results. """ # Trigger all checks, including those with caching detailed_status = self.get_detailed_status() overall_status = self.get_overall_status() system_info = self.get_system_info() return { 'system_info': system_info, 'overall_health': overall_status, 'detailed_health': detailed_status, 'timestamp': datetime.now().isoformat() } # Duplicate get_detailed_status was removed during refactor (v1.1) # Global instance health_monitor = HealthMonitor()