diff --git a/AppImage/scripts/notification_events.py b/AppImage/scripts/notification_events.py index 5978f7d3..c5a0a28c 100644 --- a/AppImage/scripts/notification_events.py +++ b/AppImage/scripts/notification_events.py @@ -111,6 +111,10 @@ class JournalWatcher: # Dedup: track recent events to avoid duplicates self._recent_events: Dict[str, float] = {} self._dedup_window = 30 # seconds + + # 24h anti-cascade for disk I/O + filesystem errors (keyed by device name) + self._disk_io_notified: Dict[str, float] = {} + self._DISK_IO_COOLDOWN = 86400 # 24 hours def start(self): """Start the journal watcher thread.""" @@ -333,20 +337,41 @@ class JournalWatcher: dev_match = re.search(r'device\s+(\S+?)\)?:', msg) device = dev_match.group(1).rstrip(')') if dev_match else 'unknown' - # Dedup by device: all EXT4 errors on sdb1 share ONE notification + # Dedup by device: all FS errors on sdb1 share ONE notification entity = 'disk' entity_id = f'fs_{device}' - # Check if the device physically exists to calibrate severity. - # A disconnected USB / temp device should NOT be CRITICAL. + # ── 24h dedup for filesystem errors per device ── + now_fs = time.time() + fs_dedup_key = f'fs_{device}' + last_fs_notified = self._disk_io_notified.get(fs_dedup_key, 0) + if now_fs - last_fs_notified < self._DISK_IO_COOLDOWN: + return # Already notified for this device recently + + # ── SMART + device existence gating ── import os as _os base_dev = re.sub(r'\d+$', '', device) if device != 'unknown' else '' device_exists = base_dev and _os.path.exists(f'/dev/{base_dev}') if not device_exists and device != 'unknown': - # Device not present -- downgrade to WARNING + # Device not present -- silently ignore (disconnected USB, etc.) + return + + # Cross-reference SMART before deciding severity + smart_health = self._quick_smart_health(base_dev) if base_dev else 'UNKNOWN' + + if smart_health == 'PASSED': + # SMART healthy -- transient FS error, don't alarm + severity = 'INFO' + elif smart_health == 'FAILED': + severity = 'CRITICAL' + else: + # UNKNOWN -- can't verify, be conservative severity = 'WARNING' + # Mark dedup timestamp now that we'll send + self._disk_io_notified[fs_dedup_key] = now_fs + # Identify what this device is (model, type, mountpoint) device_info = self._identify_block_device(device) @@ -356,21 +381,23 @@ class JournalWatcher: inode_match = re.search(r'inode\s+#?(\d+)', msg) inode = inode_match.group(1) if inode_match else '' - parts = [f'{fs_type} filesystem corruption on /dev/{device}'] - # Add device identification so the user knows what this device is + parts = [f'{fs_type} filesystem error on /dev/{device}'] if device_info: parts.append(f'Device: {device_info}') else: - parts.append(f'Device: /dev/{device} (not currently detected -- may be a disconnected USB or temporary device)') + parts.append(f'Device: /dev/{device}') + parts.append(f'SMART status: {smart_health}') if func_info: parts.append(f'Error: {self._translate_fs_function(func_info)}') if inode: inode_hint = 'root directory' if inode == '2' else f'inode #{inode}' parts.append(f'Affected: {inode_hint}') - if device_exists: - parts.append(f'Action: Run "fsck /dev/{device}" (unmount first) or check backup integrity') + if smart_health == 'FAILED': + parts.append(f'Action: Disk is failing. Run "fsck /dev/{device}" (unmount first) and plan replacement') + elif smart_health == 'PASSED': + parts.append(f'Note: SMART reports disk is healthy. This may be a transient error.') else: - parts.append('Note: Device not currently connected -- this may be a stale journal entry') + parts.append(f'Action: Run "fsck /dev/{device}" (unmount first) and check "smartctl -a /dev/{base_dev}"') enriched = '\n'.join(parts) else: @@ -551,12 +578,13 @@ class JournalWatcher: """ Detect disk I/O errors from kernel messages. - Cross-references SMART health before notifying: - - SMART PASSED -> no notification (transient controller event) - - SMART FAILED/UNKNOWN -> notify with enriched context + ONLY notifies if ALL conditions are met: + 1. SMART reports FAILED for the affected disk. + 2. The same disk was NOT already notified in the last 24 h. - Resolves ATA controller names to physical devices and identifies - the disk model/type/mountpoint for the user. + If SMART is PASSED or UNKNOWN (cannot verify), the error is + silently ignored -- transient ATA/SCSI bus noise is extremely + common and does not indicate disk failure. """ if syslog_id != 'kernel' and priority > 3: return @@ -570,55 +598,61 @@ class JournalWatcher: for pattern in io_patterns: match = re.search(pattern, msg) - if match: - raw_device = match.group(1) if match.lastindex else 'unknown' - - # Resolve ATA port to physical disk name - if raw_device.startswith('ata'): - resolved = self._resolve_ata_to_disk(raw_device) - else: - # Strip partition number (sdb1 -> sdb) - resolved = re.sub(r'\d+$', '', raw_device) if raw_device.startswith('sd') else raw_device - - # Check SMART health -- if disk is healthy, this is transient noise - smart_health = self._quick_smart_health(resolved) - if smart_health == 'PASSED': - # SMART says disk is fine, don't notify for transient ATA/SCSI events - return - - # SMART is FAILED or UNKNOWN -- this may be a real problem - device_info = self._identify_block_device(resolved) - - # Build a clear, informative reason - parts = [] - if smart_health == 'FAILED': - parts.append(f'Disk /dev/{resolved}: I/O errors detected (SMART: FAILED)') - else: - parts.append(f'Disk /dev/{resolved}: I/O errors detected (SMART: unable to verify)') - - if device_info: - parts.append(f'Device: {device_info}') - elif resolved.startswith('ata'): - parts.append(f'Device: ATA controller {raw_device} (could not resolve to physical disk)') - else: - parts.append(f'Device: /dev/{resolved} (not currently detected -- may be disconnected or temporary)') - - # Extract useful detail from the raw kernel message - detail = self._translate_ata_error(msg) - if detail: - parts.append(f'Detail: {detail}') - - parts.append('Action: Check disk health with "smartctl -a /dev/{}" and consider replacement if SMART reports failures'.format(resolved)) - - enriched = '\n'.join(parts) - - dev_display = resolved if resolved.startswith('/dev/') else f'/dev/{resolved}' - self._emit('disk_io_error', 'CRITICAL', { - 'device': dev_display, - 'reason': enriched, - 'hostname': self._hostname, - }, entity='disk', entity_id=resolved) + if not match: + continue + + raw_device = match.group(1) if match.lastindex else 'unknown' + + # Resolve ATA port to physical disk name + if raw_device.startswith('ata'): + resolved = self._resolve_ata_to_disk(raw_device) + else: + resolved = re.sub(r'\d+$', '', raw_device) if raw_device.startswith('sd') else raw_device + + # ── Gate 1: SMART must confirm disk failure ── + # If the disk is healthy (PASSED) or we can't verify + # (UNKNOWN / unresolvable ATA port), do NOT notify. + smart_health = self._quick_smart_health(resolved) + if smart_health != 'FAILED': return + + # ── Gate 2: 24-hour dedup per device ── + now = time.time() + last_notified = self._disk_io_notified.get(resolved, 0) + if now - last_notified < self._DISK_IO_COOLDOWN: + return # Already notified for this disk recently + self._disk_io_notified[resolved] = now + + # ── Build enriched notification ── + device_info = self._identify_block_device(resolved) + + parts = [] + parts.append(f'Disk /dev/{resolved}: I/O errors detected') + parts.append('SMART status: FAILED -- disk is failing') + + if device_info: + parts.append(f'Device: {device_info}') + else: + parts.append(f'Device: /dev/{resolved}') + + # Translate the raw kernel error code + detail = self._translate_ata_error(msg) + if detail: + parts.append(f'Error detail: {detail}') + + parts.append(f'Action: Replace disk /dev/{resolved} as soon as possible.') + parts.append(f' Check details: smartctl -a /dev/{resolved}') + + enriched = '\n'.join(parts) + dev_display = f'/dev/{resolved}' + + self._emit('disk_io_error', 'CRITICAL', { + 'device': dev_display, + 'reason': enriched, + 'hostname': self._hostname, + 'smart_status': 'FAILED', + }, entity='disk', entity_id=resolved) + return def _resolve_ata_to_disk(self, ata_port: str) -> str: """Resolve an ATA port name (ata8) to a physical disk name (sda).""" @@ -705,50 +739,66 @@ class JournalWatcher: return '' def _check_backup_start(self, msg: str, syslog_id: str): - """Detect backup job start from pvedaemon journal messages. + """Detect backup job start from journal messages. - Matches: INFO: starting new backup job: vzdump 110 --storage PBS-Cloud --mode stop ... + Matches multiple formats: + - pvedaemon: "INFO: starting new backup job: vzdump 110 --storage PBS-Cloud --mode stop ..." + - pvesh: "INFO: starting new backup job: vzdump 104 --mode stop --storage PBS-Cloud ..." + - vzdump: "starting new backup job: vzdump 110 --storage PBS-Cloud --mode stop ..." + - vzdump: "INFO: Starting Backup of VM 110 (qemu)" (per-guest fallback) - PVE always emits this message from pvedaemon for BOTH scheduled and - manual backups. It contains the full guest list and all parameters. - The UPID "starting task" message is ignored because it arrives first - but lacks storage/mode/compression details. + PVE emits from pvedaemon for scheduled backups, from pvesh for + API/CLI-triggered backups, and from vzdump for the per-guest lines. """ - if syslog_id != 'pvedaemon': + if syslog_id not in ('pvedaemon', 'pvesh', 'vzdump', ''): return - match = re.match(r'INFO: starting new backup job: vzdump\s+(.*)', msg) + # Primary pattern: full vzdump command with all arguments + # Matches both "INFO: starting new backup job: vzdump ..." and + # "starting new backup job: vzdump ..." + match = re.match(r'(?:INFO:\s*)?starting new backup job:\s*vzdump\s+(.*)', msg, re.IGNORECASE) + + # Fallback: vzdump also emits per-guest messages like: + # "INFO: Starting Backup of VM 104 (lxc)" + # These fire for EACH guest when a multi-guest vzdump job runs. + # We only use this if the primary pattern didn't match. + fallback_guest = None if not match: - return + fb = re.match(r'(?:INFO:\s*)?Starting Backup of VM (\d+)\s+\((lxc|qemu)\)', msg) + if fb: + fallback_guest = fb.group(1) + else: + return - raw_args = match.group(1) - - # Parse the vzdump arguments from the log message guests = [] storage = '' mode = '' compress = '' - args = raw_args.split() - i = 0 - while i < len(args): - arg = args[i] - if arg.isdigit(): - guests.append(arg) - elif arg == '--storage' and i + 1 < len(args): - storage = args[i + 1] + if match: + raw_args = match.group(1) + args = raw_args.split() + i = 0 + while i < len(args): + arg = args[i] + if arg.isdigit(): + guests.append(arg) + elif arg == '--storage' and i + 1 < len(args): + storage = args[i + 1] + i += 1 + elif arg == '--mode' and i + 1 < len(args): + mode = args[i + 1] + i += 1 + elif arg == '--compress' and i + 1 < len(args): + compress = args[i + 1] + i += 1 + elif arg == '--all' and i + 1 < len(args): + if args[i + 1] == '1': + guests = ['all'] + i += 1 i += 1 - elif arg == '--mode' and i + 1 < len(args): - mode = args[i + 1] - i += 1 - elif arg == '--compress' and i + 1 < len(args): - compress = args[i + 1] - i += 1 - elif arg == '--all' and i + 1 < len(args): - if args[i + 1] == '1': - guests = ['all'] - i += 1 - i += 1 + elif fallback_guest: + guests = [fallback_guest] # Build the notification body reason_parts = [] @@ -778,13 +828,18 @@ class JournalWatcher: reason = '\n'.join(reason_parts) if reason_parts else 'Backup job started' + # Use a stable entity_id that includes the guest list so that + # different backup jobs produce distinct fingerprints and don't + # dedup each other, while the SAME job doesn't fire twice. + guest_key = '_'.join(sorted(guests)) if guests else 'unknown' + self._emit('backup_start', 'INFO', { 'vmid': ', '.join(guests), 'vmname': '', 'hostname': self._hostname, 'user': '', 'reason': reason, - }, entity='backup', entity_id='vzdump') + }, entity='backup', entity_id=f'vzdump_{guest_key}') def _resolve_vm_name(self, vmid: str) -> str: """Try to resolve a VMID to its name from PVE config files.""" diff --git a/AppImage/scripts/notification_manager.py b/AppImage/scripts/notification_manager.py index 8d1fe537..733aacc8 100644 --- a/AppImage/scripts/notification_manager.py +++ b/AppImage/scripts/notification_manager.py @@ -699,6 +699,15 @@ class NotificationManager: if event.severity == 'CRITICAL' and cooldown_str is None: cooldown = 60 + # Disk I/O and filesystem errors: 24h cooldown per fingerprint. + # Same as Proxmox's notification policy. The JournalWatcher already + # gates these through SMART verification + its own 24h dedup, but + # this acts as defense-in-depth in case a disk event arrives from + # another source (PollingCollector, hooks, etc.). + _DISK_EVENTS = {'disk_io_error', 'storage_unavailable'} + if event.event_type in _DISK_EVENTS and cooldown_str is None: + cooldown = 86400 # 24 hours + # Backup/replication events: each execution is unique and should # always be delivered. A 10s cooldown prevents exact duplicates # (webhook + tasks) but allows repeated backup jobs to report. diff --git a/AppImage/scripts/notification_templates.py b/AppImage/scripts/notification_templates.py index ef3c298e..2d59dc65 100644 --- a/AppImage/scripts/notification_templates.py +++ b/AppImage/scripts/notification_templates.py @@ -480,7 +480,7 @@ TEMPLATES = { 'default_enabled': True, }, 'disk_io_error': { - 'title': '{hostname}: Disk I/O error on {device}', + 'title': '{hostname}: Disk failure detected on {device}', 'body': '{reason}', 'group': 'storage', 'default_enabled': True,