Update notification service

This commit is contained in:
MacRimi
2026-03-01 22:29:58 +01:00
parent dc52f4c692
commit 0dfb35730f
3 changed files with 160 additions and 96 deletions
+150 -95
View File
@@ -111,6 +111,10 @@ class JournalWatcher:
# Dedup: track recent events to avoid duplicates # Dedup: track recent events to avoid duplicates
self._recent_events: Dict[str, float] = {} self._recent_events: Dict[str, float] = {}
self._dedup_window = 30 # seconds self._dedup_window = 30 # seconds
# 24h anti-cascade for disk I/O + filesystem errors (keyed by device name)
self._disk_io_notified: Dict[str, float] = {}
self._DISK_IO_COOLDOWN = 86400 # 24 hours
def start(self): def start(self):
"""Start the journal watcher thread.""" """Start the journal watcher thread."""
@@ -333,20 +337,41 @@ class JournalWatcher:
dev_match = re.search(r'device\s+(\S+?)\)?:', msg) dev_match = re.search(r'device\s+(\S+?)\)?:', msg)
device = dev_match.group(1).rstrip(')') if dev_match else 'unknown' device = dev_match.group(1).rstrip(')') if dev_match else 'unknown'
# Dedup by device: all EXT4 errors on sdb1 share ONE notification # Dedup by device: all FS errors on sdb1 share ONE notification
entity = 'disk' entity = 'disk'
entity_id = f'fs_{device}' entity_id = f'fs_{device}'
# Check if the device physically exists to calibrate severity. # ── 24h dedup for filesystem errors per device ──
# A disconnected USB / temp device should NOT be CRITICAL. now_fs = time.time()
fs_dedup_key = f'fs_{device}'
last_fs_notified = self._disk_io_notified.get(fs_dedup_key, 0)
if now_fs - last_fs_notified < self._DISK_IO_COOLDOWN:
return # Already notified for this device recently
# ── SMART + device existence gating ──
import os as _os import os as _os
base_dev = re.sub(r'\d+$', '', device) if device != 'unknown' else '' base_dev = re.sub(r'\d+$', '', device) if device != 'unknown' else ''
device_exists = base_dev and _os.path.exists(f'/dev/{base_dev}') device_exists = base_dev and _os.path.exists(f'/dev/{base_dev}')
if not device_exists and device != 'unknown': if not device_exists and device != 'unknown':
# Device not present -- downgrade to WARNING # Device not present -- silently ignore (disconnected USB, etc.)
return
# Cross-reference SMART before deciding severity
smart_health = self._quick_smart_health(base_dev) if base_dev else 'UNKNOWN'
if smart_health == 'PASSED':
# SMART healthy -- transient FS error, don't alarm
severity = 'INFO'
elif smart_health == 'FAILED':
severity = 'CRITICAL'
else:
# UNKNOWN -- can't verify, be conservative
severity = 'WARNING' severity = 'WARNING'
# Mark dedup timestamp now that we'll send
self._disk_io_notified[fs_dedup_key] = now_fs
# Identify what this device is (model, type, mountpoint) # Identify what this device is (model, type, mountpoint)
device_info = self._identify_block_device(device) device_info = self._identify_block_device(device)
@@ -356,21 +381,23 @@ class JournalWatcher:
inode_match = re.search(r'inode\s+#?(\d+)', msg) inode_match = re.search(r'inode\s+#?(\d+)', msg)
inode = inode_match.group(1) if inode_match else '' inode = inode_match.group(1) if inode_match else ''
parts = [f'{fs_type} filesystem corruption on /dev/{device}'] parts = [f'{fs_type} filesystem error on /dev/{device}']
# Add device identification so the user knows what this device is
if device_info: if device_info:
parts.append(f'Device: {device_info}') parts.append(f'Device: {device_info}')
else: else:
parts.append(f'Device: /dev/{device} (not currently detected -- may be a disconnected USB or temporary device)') parts.append(f'Device: /dev/{device}')
parts.append(f'SMART status: {smart_health}')
if func_info: if func_info:
parts.append(f'Error: {self._translate_fs_function(func_info)}') parts.append(f'Error: {self._translate_fs_function(func_info)}')
if inode: if inode:
inode_hint = 'root directory' if inode == '2' else f'inode #{inode}' inode_hint = 'root directory' if inode == '2' else f'inode #{inode}'
parts.append(f'Affected: {inode_hint}') parts.append(f'Affected: {inode_hint}')
if device_exists: if smart_health == 'FAILED':
parts.append(f'Action: Run "fsck /dev/{device}" (unmount first) or check backup integrity') parts.append(f'Action: Disk is failing. Run "fsck /dev/{device}" (unmount first) and plan replacement')
elif smart_health == 'PASSED':
parts.append(f'Note: SMART reports disk is healthy. This may be a transient error.')
else: else:
parts.append('Note: Device not currently connected -- this may be a stale journal entry') parts.append(f'Action: Run "fsck /dev/{device}" (unmount first) and check "smartctl -a /dev/{base_dev}"')
enriched = '\n'.join(parts) enriched = '\n'.join(parts)
else: else:
@@ -551,12 +578,13 @@ class JournalWatcher:
""" """
Detect disk I/O errors from kernel messages. Detect disk I/O errors from kernel messages.
Cross-references SMART health before notifying: ONLY notifies if ALL conditions are met:
- SMART PASSED -> no notification (transient controller event) 1. SMART reports FAILED for the affected disk.
- SMART FAILED/UNKNOWN -> notify with enriched context 2. The same disk was NOT already notified in the last 24 h.
Resolves ATA controller names to physical devices and identifies If SMART is PASSED or UNKNOWN (cannot verify), the error is
the disk model/type/mountpoint for the user. silently ignored -- transient ATA/SCSI bus noise is extremely
common and does not indicate disk failure.
""" """
if syslog_id != 'kernel' and priority > 3: if syslog_id != 'kernel' and priority > 3:
return return
@@ -570,55 +598,61 @@ class JournalWatcher:
for pattern in io_patterns: for pattern in io_patterns:
match = re.search(pattern, msg) match = re.search(pattern, msg)
if match: if not match:
raw_device = match.group(1) if match.lastindex else 'unknown' continue
# Resolve ATA port to physical disk name raw_device = match.group(1) if match.lastindex else 'unknown'
if raw_device.startswith('ata'):
resolved = self._resolve_ata_to_disk(raw_device) # Resolve ATA port to physical disk name
else: if raw_device.startswith('ata'):
# Strip partition number (sdb1 -> sdb) resolved = self._resolve_ata_to_disk(raw_device)
resolved = re.sub(r'\d+$', '', raw_device) if raw_device.startswith('sd') else raw_device else:
resolved = re.sub(r'\d+$', '', raw_device) if raw_device.startswith('sd') else raw_device
# Check SMART health -- if disk is healthy, this is transient noise
smart_health = self._quick_smart_health(resolved) # ── Gate 1: SMART must confirm disk failure ──
if smart_health == 'PASSED': # If the disk is healthy (PASSED) or we can't verify
# SMART says disk is fine, don't notify for transient ATA/SCSI events # (UNKNOWN / unresolvable ATA port), do NOT notify.
return smart_health = self._quick_smart_health(resolved)
if smart_health != 'FAILED':
# SMART is FAILED or UNKNOWN -- this may be a real problem
device_info = self._identify_block_device(resolved)
# Build a clear, informative reason
parts = []
if smart_health == 'FAILED':
parts.append(f'Disk /dev/{resolved}: I/O errors detected (SMART: FAILED)')
else:
parts.append(f'Disk /dev/{resolved}: I/O errors detected (SMART: unable to verify)')
if device_info:
parts.append(f'Device: {device_info}')
elif resolved.startswith('ata'):
parts.append(f'Device: ATA controller {raw_device} (could not resolve to physical disk)')
else:
parts.append(f'Device: /dev/{resolved} (not currently detected -- may be disconnected or temporary)')
# Extract useful detail from the raw kernel message
detail = self._translate_ata_error(msg)
if detail:
parts.append(f'Detail: {detail}')
parts.append('Action: Check disk health with "smartctl -a /dev/{}" and consider replacement if SMART reports failures'.format(resolved))
enriched = '\n'.join(parts)
dev_display = resolved if resolved.startswith('/dev/') else f'/dev/{resolved}'
self._emit('disk_io_error', 'CRITICAL', {
'device': dev_display,
'reason': enriched,
'hostname': self._hostname,
}, entity='disk', entity_id=resolved)
return return
# ── Gate 2: 24-hour dedup per device ──
now = time.time()
last_notified = self._disk_io_notified.get(resolved, 0)
if now - last_notified < self._DISK_IO_COOLDOWN:
return # Already notified for this disk recently
self._disk_io_notified[resolved] = now
# ── Build enriched notification ──
device_info = self._identify_block_device(resolved)
parts = []
parts.append(f'Disk /dev/{resolved}: I/O errors detected')
parts.append('SMART status: FAILED -- disk is failing')
if device_info:
parts.append(f'Device: {device_info}')
else:
parts.append(f'Device: /dev/{resolved}')
# Translate the raw kernel error code
detail = self._translate_ata_error(msg)
if detail:
parts.append(f'Error detail: {detail}')
parts.append(f'Action: Replace disk /dev/{resolved} as soon as possible.')
parts.append(f' Check details: smartctl -a /dev/{resolved}')
enriched = '\n'.join(parts)
dev_display = f'/dev/{resolved}'
self._emit('disk_io_error', 'CRITICAL', {
'device': dev_display,
'reason': enriched,
'hostname': self._hostname,
'smart_status': 'FAILED',
}, entity='disk', entity_id=resolved)
return
def _resolve_ata_to_disk(self, ata_port: str) -> str: def _resolve_ata_to_disk(self, ata_port: str) -> str:
"""Resolve an ATA port name (ata8) to a physical disk name (sda).""" """Resolve an ATA port name (ata8) to a physical disk name (sda)."""
@@ -705,50 +739,66 @@ class JournalWatcher:
return '' return ''
def _check_backup_start(self, msg: str, syslog_id: str): def _check_backup_start(self, msg: str, syslog_id: str):
"""Detect backup job start from pvedaemon journal messages. """Detect backup job start from journal messages.
Matches: INFO: starting new backup job: vzdump 110 --storage PBS-Cloud --mode stop ... Matches multiple formats:
- pvedaemon: "INFO: starting new backup job: vzdump 110 --storage PBS-Cloud --mode stop ..."
- pvesh: "INFO: starting new backup job: vzdump 104 --mode stop --storage PBS-Cloud ..."
- vzdump: "starting new backup job: vzdump 110 --storage PBS-Cloud --mode stop ..."
- vzdump: "INFO: Starting Backup of VM 110 (qemu)" (per-guest fallback)
PVE always emits this message from pvedaemon for BOTH scheduled and PVE emits from pvedaemon for scheduled backups, from pvesh for
manual backups. It contains the full guest list and all parameters. API/CLI-triggered backups, and from vzdump for the per-guest lines.
The UPID "starting task" message is ignored because it arrives first
but lacks storage/mode/compression details.
""" """
if syslog_id != 'pvedaemon': if syslog_id not in ('pvedaemon', 'pvesh', 'vzdump', ''):
return return
match = re.match(r'INFO: starting new backup job: vzdump\s+(.*)', msg) # Primary pattern: full vzdump command with all arguments
# Matches both "INFO: starting new backup job: vzdump ..." and
# "starting new backup job: vzdump ..."
match = re.match(r'(?:INFO:\s*)?starting new backup job:\s*vzdump\s+(.*)', msg, re.IGNORECASE)
# Fallback: vzdump also emits per-guest messages like:
# "INFO: Starting Backup of VM 104 (lxc)"
# These fire for EACH guest when a multi-guest vzdump job runs.
# We only use this if the primary pattern didn't match.
fallback_guest = None
if not match: if not match:
return fb = re.match(r'(?:INFO:\s*)?Starting Backup of VM (\d+)\s+\((lxc|qemu)\)', msg)
if fb:
fallback_guest = fb.group(1)
else:
return
raw_args = match.group(1)
# Parse the vzdump arguments from the log message
guests = [] guests = []
storage = '' storage = ''
mode = '' mode = ''
compress = '' compress = ''
args = raw_args.split() if match:
i = 0 raw_args = match.group(1)
while i < len(args): args = raw_args.split()
arg = args[i] i = 0
if arg.isdigit(): while i < len(args):
guests.append(arg) arg = args[i]
elif arg == '--storage' and i + 1 < len(args): if arg.isdigit():
storage = args[i + 1] guests.append(arg)
elif arg == '--storage' and i + 1 < len(args):
storage = args[i + 1]
i += 1
elif arg == '--mode' and i + 1 < len(args):
mode = args[i + 1]
i += 1
elif arg == '--compress' and i + 1 < len(args):
compress = args[i + 1]
i += 1
elif arg == '--all' and i + 1 < len(args):
if args[i + 1] == '1':
guests = ['all']
i += 1
i += 1 i += 1
elif arg == '--mode' and i + 1 < len(args): elif fallback_guest:
mode = args[i + 1] guests = [fallback_guest]
i += 1
elif arg == '--compress' and i + 1 < len(args):
compress = args[i + 1]
i += 1
elif arg == '--all' and i + 1 < len(args):
if args[i + 1] == '1':
guests = ['all']
i += 1
i += 1
# Build the notification body # Build the notification body
reason_parts = [] reason_parts = []
@@ -778,13 +828,18 @@ class JournalWatcher:
reason = '\n'.join(reason_parts) if reason_parts else 'Backup job started' reason = '\n'.join(reason_parts) if reason_parts else 'Backup job started'
# Use a stable entity_id that includes the guest list so that
# different backup jobs produce distinct fingerprints and don't
# dedup each other, while the SAME job doesn't fire twice.
guest_key = '_'.join(sorted(guests)) if guests else 'unknown'
self._emit('backup_start', 'INFO', { self._emit('backup_start', 'INFO', {
'vmid': ', '.join(guests), 'vmid': ', '.join(guests),
'vmname': '', 'vmname': '',
'hostname': self._hostname, 'hostname': self._hostname,
'user': '', 'user': '',
'reason': reason, 'reason': reason,
}, entity='backup', entity_id='vzdump') }, entity='backup', entity_id=f'vzdump_{guest_key}')
def _resolve_vm_name(self, vmid: str) -> str: def _resolve_vm_name(self, vmid: str) -> str:
"""Try to resolve a VMID to its name from PVE config files.""" """Try to resolve a VMID to its name from PVE config files."""
+9
View File
@@ -699,6 +699,15 @@ class NotificationManager:
if event.severity == 'CRITICAL' and cooldown_str is None: if event.severity == 'CRITICAL' and cooldown_str is None:
cooldown = 60 cooldown = 60
# Disk I/O and filesystem errors: 24h cooldown per fingerprint.
# Same as Proxmox's notification policy. The JournalWatcher already
# gates these through SMART verification + its own 24h dedup, but
# this acts as defense-in-depth in case a disk event arrives from
# another source (PollingCollector, hooks, etc.).
_DISK_EVENTS = {'disk_io_error', 'storage_unavailable'}
if event.event_type in _DISK_EVENTS and cooldown_str is None:
cooldown = 86400 # 24 hours
# Backup/replication events: each execution is unique and should # Backup/replication events: each execution is unique and should
# always be delivered. A 10s cooldown prevents exact duplicates # always be delivered. A 10s cooldown prevents exact duplicates
# (webhook + tasks) but allows repeated backup jobs to report. # (webhook + tasks) but allows repeated backup jobs to report.
+1 -1
View File
@@ -480,7 +480,7 @@ TEMPLATES = {
'default_enabled': True, 'default_enabled': True,
}, },
'disk_io_error': { 'disk_io_error': {
'title': '{hostname}: Disk I/O error on {device}', 'title': '{hostname}: Disk failure detected on {device}',
'body': '{reason}', 'body': '{reason}',
'group': 'storage', 'group': 'storage',
'default_enabled': True, 'default_enabled': True,