diff --git a/AppImage/scripts/flask_notification_routes.py b/AppImage/scripts/flask_notification_routes.py index 33db4619..c0a79c63 100644 --- a/AppImage/scripts/flask_notification_routes.py +++ b/AppImage/scripts/flask_notification_routes.py @@ -232,7 +232,7 @@ def _pve_remove_our_blocks(text, headers_to_remove): def _build_webhook_fallback(): """Build fallback manual commands for webhook setup.""" import base64 - body_tpl = '{"title":"{{ title }}","message":"{{ message }}","severity":"{{ severity }}","timestamp":"{{ timestamp }}"}' + body_tpl = '{"title":"{{ escape title }}","message":"{{ escape message }}","severity":"{{ severity }}","timestamp":"{{ timestamp }}","fields":{{ json fields }}}' body_b64 = base64.b64encode(body_tpl.encode()).decode() return [ "# 1. Append to END of /etc/pve/notifications.cfg", @@ -310,15 +310,13 @@ def setup_pve_webhook_core() -> dict: # PVE secret format is: secret name=key,value= # Neither is needed for localhost calls. - # PVE stores body as base64 in the config file. We encode it here - # so the config parser reads it correctly. - # Only use fields that ALWAYS exist: title, message, severity, timestamp. - # "type" and "hostname" are inside "fields" (not top-level) and - # {{ escape X }} fails hard on undefined fields, breaking the - # entire notification delivery for ALL targets. - # Our _classify() extracts type/hostname from the message content. + # PVE stores body as base64 in the config file. + # {{ escape title/message }} -- JSON-safe escaping of quotes/newlines. + # {{ json fields }} -- renders ALL PVE metadata as a JSON object + # (type, hostname, job-id). This is a single Handlebars helper + # that always works, even if fields is empty (renders {}). import base64 - body_template = '{"title":"{{ title }}","message":"{{ message }}","severity":"{{ severity }}","timestamp":"{{ timestamp }}"}' + body_template = '{"title":"{{ escape title }}","message":"{{ escape message }}","severity":"{{ severity }}","timestamp":"{{ timestamp }}","fields":{{ json fields }}}' body_b64 = base64.b64encode(body_template.encode()).decode() endpoint_block = ( diff --git a/AppImage/scripts/notification_events.py b/AppImage/scripts/notification_events.py index 109da59f..98037d72 100644 --- a/AppImage/scripts/notification_events.py +++ b/AppImage/scripts/notification_events.py @@ -457,6 +457,9 @@ class TaskWatcher: self._thread: Optional[threading.Thread] = None self._hostname = _hostname() self._last_position = 0 + # Set by NotificationManager to point at ProxmoxHookWatcher._delivered + # so we can skip events the webhook already delivered with richer data. + self._webhook_delivered: Optional[dict] = None def start(self): if self._running: @@ -571,6 +574,19 @@ class TaskWatcher: # Determine entity type from task type entity = 'ct' if task_type.startswith('vz') else 'vm' + + # ── Cross-source dedup: yield to PVE webhook for backup/replication ── + # The webhook delivers richer data (full logs, sizes, durations). + # If the webhook already delivered this event within 120s, skip. + _WEBHOOK_TYPES = {'backup_complete', 'backup_fail', 'backup_start', + 'replication_complete', 'replication_fail'} + if event_type in _WEBHOOK_TYPES and self._webhook_delivered: + import time as _time + dedup_key = f"{event_type}:{vmid}" + last_webhook = self._webhook_delivered.get(dedup_key, 0) + if _time.time() - last_webhook < 120: + return # Webhook already delivered this with richer data + self._queue.put(NotificationEvent( event_type, severity, data, source='tasks', entity=entity, entity_id=vmid, @@ -925,111 +941,100 @@ class ProxmoxHookWatcher: def __init__(self, event_queue: Queue): self._queue = event_queue self._hostname = _hostname() + # Shared dict: TaskWatcher._webhook_delivered points here. + # Records (event_type:entity_id -> timestamp) for cross-source dedup. + self._delivered: Dict[str, float] = {} def process_webhook(self, payload: dict) -> dict: """Process an incoming Proxmox webhook payload. + The PVE webhook is the PRIMARY source for vzdump, replication, + fencing, package-updates and system-mail events. PVE sends rich + detail (full logs, sizes, durations) that TaskWatcher cannot match. + + Body template delivers: + {title, message, severity, timestamp, fields: {type, hostname, job-id}} + Returns: {'accepted': bool, 'event_type': str, 'event_id': str} - or {'accepted': False, 'error': str} """ if not payload: return {'accepted': False, 'error': 'Empty payload'} - # ── Normalise PVE native webhook format ── - # PVE sends: {title, message, severity, timestamp, fields: {type, hostname, job-id}} - # Our code expects: {type, severity, title, body, component, ...} - # Flatten `fields` into the top-level payload so _classify sees them. - if 'fields' in payload and isinstance(payload['fields'], dict): - fields = payload['fields'] - # Map PVE field names to our expected names - if 'type' in fields and 'type' not in payload: - payload['type'] = fields['type'] # vzdump, fencing, replication, etc. - if 'hostname' in fields: - payload['hostname'] = fields['hostname'] - if 'job-id' in fields: - payload['job_id'] = fields['job-id'] - # Merge remaining fields - for k, v in fields.items(): - if k not in payload: - payload[k] = v + # ── Extract structured PVE fields ── + fields = payload.get('fields') or {} + if isinstance(fields, str): + # Edge case: {{ json fields }} rendered as string instead of dict + try: + import json + fields = json.loads(fields) + except (json.JSONDecodeError, ValueError): + fields = {} - # PVE uses 'message' for the body text - if 'message' in payload and 'body' not in payload: - payload['body'] = payload['message'] + pve_type = fields.get('type', '').lower().strip() + pve_hostname = fields.get('hostname', self._hostname) + pve_job_id = fields.get('job-id', '') - # Extract common fields from PVE notification payload - notification_type = payload.get('type', payload.get('notification-type', '')) - severity_raw = payload.get('severity', payload.get('priority', 'info')) - title = payload.get('title', payload.get('subject', '')) - body = payload.get('body', payload.get('message', '')) - source_component = payload.get('component', payload.get('source', payload.get('type', ''))) + title = payload.get('title', '') + message = payload.get('message', payload.get('body', '')) + severity_raw = payload.get('severity', 'info').lower().strip() + timestamp = payload.get('timestamp', '') - # If 'type' is already a known template key, use it directly. - # This allows tests and internal callers to inject events by exact type - # without relying on _classify's keyword heuristics. - from notification_templates import TEMPLATES as _TMPL - if notification_type in _TMPL: - event_type = notification_type - entity = payload.get('entity', 'node') - entity_id = payload.get('entity_id', payload.get('vmid', '')) - else: - # Map to our event taxonomy via heuristic classification - event_type, entity, entity_id = self._classify( - notification_type, source_component, title, body, payload - ) + # ── Classify by PVE type (direct, no heuristics needed) ── + import re + event_type, entity, entity_id = self._classify_pve( + pve_type, severity_raw, title, message + ) - # Discard meta-events (overall status changes, update status, etc.) + # Discard meta-events if event_type == '_skip': return {'accepted': False, 'skipped': True, 'reason': 'Meta-event filtered'} severity = self._map_severity(severity_raw) - # Extract "reason" as extra detail, NOT the full body. - # Templates already have their own intro text (e.g. "{vmname} has failed."). - # The body from the webhook often starts with the same intro, so using the - # full body as {reason} causes duplication. We strip the first line (which - # is typically the title/summary) and keep only the extra detail lines. - reason = '' - if body: - body_lines = body.strip().split('\n') - # If more than 1 line, skip the first (summary) and use the rest - if len(body_lines) > 1: - reason = '\n'.join(body_lines[1:]).strip()[:500] - else: - # Single-line body: only use it as reason if it differs from title - if body.strip().lower() != (title or '').strip().lower(): - reason = body.strip()[:500] - + # ── Build rich data dict ── + # For webhook events, PVE's `message` IS the notification body. + # It contains full vzdump logs, package lists, error details, etc. + # We pass it as 'pve_message' so templates can use it directly. data = { - 'hostname': self._hostname, - 'reason': reason, + 'hostname': pve_hostname, + 'pve_type': pve_type, + 'pve_message': message, 'title': title, - 'source_component': source_component, - 'notification_type': notification_type, + 'job_id': pve_job_id, } - # Merge ALL extra fields from payload into data so template - # variables ({vmid}, {vmname}, {count}, {source_ip}, etc.) resolve. - _reserved = {'type', 'severity', 'priority', 'title', 'subject', - 'body', 'message', 'component', 'source'} - for key, val in payload.items(): - if key not in _reserved and key not in data: - data[key] = str(val) if val is not None else '' - # Skip event types that the TaskWatcher already captures with - # full details (VMID, VM name, size, duration, etc.). - # The PVE webhook sends inferior data for these (no VMID, no name). - # Only forward events that tasks/journal do NOT catch reliably. - _TASKS_HANDLED = { - 'backup_complete', 'backup_fail', 'backup_start', - 'snapshot_complete', 'snapshot_fail', - 'vm_start', 'vm_stop', 'vm_restart', 'vm_shutdown', - 'ct_start', 'ct_stop', - 'migration_start', 'migration_complete', 'migration_fail', - 'replication_complete', 'replication_fail', - } - if event_type in _TASKS_HANDLED: - return {'accepted': False, 'skipped': True, - 'reason': f'tasks_watcher_handles_{event_type}'} + # Extract VMID and VM name from message for vzdump events + if pve_type == 'vzdump' and message: + # PVE vzdump messages contain lines like: + # "INFO: Starting Backup of VM 100 (qemu)" + # "VMID Name Status Time Size Filename" + # "100 arch-linux OK 00:05:30 1.2G /path/to/file" + vmids = re.findall(r'(?:VM|CT)\s+(\d+)', message, re.IGNORECASE) + if vmids: + data['vmid'] = vmids[0] + entity_id = vmids[0] + # Try to extract VM name from the table line + name_m = re.search(r'(\d+)\s+(\S+)\s+(?:OK|ERROR|WARNINGS)', message) + if name_m: + data['vmname'] = name_m.group(2) + # Extract size from "Total size: X" + size_m = re.search(r'Total size:\s*(.+?)(?:\n|$)', message) + if size_m: + data['size'] = size_m.group(1).strip() + # Extract duration from "Total running time: X" + dur_m = re.search(r'Total running time:\s*(.+?)(?:\n|$)', message) + if dur_m: + data['duration'] = dur_m.group(1).strip() + + # Record in the shared delivered dict so TaskWatcher can dedup. + # TaskWatcher._webhook_delivered points at self._delivered (same object). + self._delivered[f"{event_type}:{entity_id}"] = time.time() + # Prune old entries periodically + if len(self._delivered) > 200: + cutoff = time.time() - 300 + to_del = [k for k, v in self._delivered.items() if v < cutoff] + for k in to_del: + del self._delivered[k] event = NotificationEvent( event_type=event_type, @@ -1044,50 +1049,28 @@ class ProxmoxHookWatcher: self._queue.put(event) return {'accepted': True, 'event_type': event_type, 'event_id': event.event_id} - def _classify(self, ntype: str, component: str, title: str, - body: str, payload: dict) -> tuple: - """Classify webhook payload into (event_type, entity, entity_id). + def _classify_pve(self, pve_type: str, severity: str, + title: str, message: str) -> tuple: + """Classify using PVE's structured fields.type. - Returns ('_skip', '', '') for meta-events we should discard. + Returns (event_type, entity, entity_id). """ title_lower = (title or '').lower() - body_lower = (body or '').lower() - component_lower = (component or '').lower() - # ── Skip PVE meta-events ── - # PVE sends "overall status changed from OK to WARNING" which is a meta - # aggregation event. Our own health monitor handles the underlying issues - # with better granularity, so we skip these to avoid noise/duplicates. + # Skip overall/updates status change meta-events if 'overall' in title_lower and ('changed' in title_lower or 'status' in title_lower): return '_skip', '', '' - - # ── Skip "updates changed" status events ── - # PVE sends "updates status changed from OK to WARNING" when apt updates - # are available. Our PollingCollector already handles update checks with - # proper detail (security count, package list) on a 24h cycle. if 'updates' in title_lower and ('changed' in title_lower or 'status' in title_lower): return '_skip', '', '' - # ── PVE native notification types ── - # When PVE sends via our webhook body template, fields.type is one of: - # vzdump, fencing, replication, package-updates, system-mail - pve_type = payload.get('type', '').lower().strip() - + # ── Direct classification by PVE type ── if pve_type == 'vzdump': - # Backup notification -- determine success or failure from severity - pve_sev = payload.get('severity', 'info').lower() - vmid = '' - # Try to extract VMID from title like "Backup of VM 100 (qemu)" - import re - m = re.search(r'VM\s+(\d+)|CT\s+(\d+)', title, re.IGNORECASE) - if m: - vmid = m.group(1) or m.group(2) or '' - if pve_sev == 'error': - return 'backup_fail', 'vm', vmid - return 'backup_complete', 'vm', vmid + if severity in ('error', 'err'): + return 'backup_fail', 'vm', '' + return 'backup_complete', 'vm', '' if pve_type == 'fencing': - return 'split_brain', 'node', payload.get('hostname', '') + return 'split_brain', 'node', '' if pve_type == 'replication': return 'replication_fail', 'vm', '' @@ -1096,121 +1079,29 @@ class ProxmoxHookWatcher: return 'update_available', 'node', '' if pve_type == 'system-mail': - # Forwarded system mail (e.g. from smartd) -- treat as system_problem - return 'system_problem', 'node', '' + return 'system_mail', 'node', '' - # VM / CT lifecycle events (if sent via webhook) - vmid = str(payload.get('vmid', '')) - if any(k in component_lower for k in ('qemu', 'lxc', 'vm', 'ct', 'container')): - if any(w in title_lower for w in ('start', 'running')): - etype = 'ct_start' if 'lxc' in component_lower or 'container' in component_lower else 'vm_start' - return etype, 'vm', vmid - if any(w in title_lower for w in ('stop', 'shutdown', 'down')): - etype = 'ct_stop' if 'lxc' in component_lower or 'container' in component_lower else 'vm_stop' - return etype, 'vm', vmid - if 'fail' in title_lower or 'crash' in title_lower or 'error' in body_lower: - return 'vm_fail', 'vm', vmid - if 'migrat' in title_lower: - return 'migration_complete', 'vm', vmid - return 'vm_start', 'vm', vmid + # ── Fallback for unknown/empty pve_type ── + # (e.g. test notifications, future PVE event types) + msg_lower = (message or '').lower() + text = f"{title_lower} {msg_lower}" - # Also check title for VM keywords if component is not specific - if vmid or any(k in title_lower for k in ('vm ', 'ct ', 'qemu', 'lxc')): - if 'start' in title_lower: - return 'vm_start', 'vm', vmid - if any(w in title_lower for w in ('stop', 'shutdown')): - return 'vm_stop', 'vm', vmid - if 'fail' in title_lower or 'crash' in title_lower: - return 'vm_fail', 'vm', vmid - - # Storage / SMART / ZFS / Ceph - if any(k in component_lower for k in ('smart', 'disk', 'zfs', 'ceph')): - entity_id = payload.get('device', payload.get('pool', '')) - if 'smart' in title_lower or 'smart' in body_lower: - return 'disk_io_error', 'disk', str(entity_id) - if 'zfs' in title_lower: - return 'disk_io_error', 'storage', str(entity_id) - return 'disk_space_low', 'storage', str(entity_id) - - # Replication - if 'replication' in component_lower or 'replication' in title_lower: - vmid = str(payload.get('vmid', '')) - if 'fail' in title_lower or 'error' in body_lower: - return 'replication_fail', 'vm', vmid - return 'replication_complete', 'vm', vmid - - # PBS (Proxmox Backup Server) - if 'pbs' in component_lower or 'backup' in component_lower: - vmid = str(payload.get('vmid', '')) - if 'fail' in title_lower or 'error' in body_lower: - return 'backup_fail', 'vm', vmid - if 'complete' in title_lower or 'success' in body_lower: - return 'backup_complete', 'vm', vmid - return 'backup_start', 'vm', vmid - - # Cluster / HA / Fencing / Corosync - if any(k in component_lower for k in ('cluster', 'ha', 'fencing', 'corosync')): - node = str(payload.get('node', '')) - if 'quorum' in title_lower or 'split' in body_lower: - return 'split_brain', 'cluster', node - if 'fencing' in title_lower: - return 'node_disconnect', 'cluster', node - return 'node_disconnect', 'cluster', node - - # APT / Updates - if 'apt' in component_lower or 'update' in title_lower: - return 'update_available', 'node', '' - - # Network - if 'network' in component_lower: - return 'network_down', 'network', '' - - # Security -- distinguish firewall from auth - if 'firewall' in component_lower or 'firewall' in title_lower: - return 'firewall_issue', 'node', '' - if any(k in component_lower for k in ('auth', 'security', 'pam', 'sshd')): - return 'auth_fail', 'user', '' - - # ── Text-based heuristics (for proxmox_hook where component is empty) ── - # Scan title + body for known keywords regardless of component. - text = f"{title_lower} {body_lower}" - - # Backup / vzdump if 'vzdump' in text or 'backup' in text: import re - m = re.search(r'(?:vm|ct|vmid)\s*(\d+)', text, re.IGNORECASE) + m = re.search(r'(?:vm|ct)\s+(\d+)', text, re.IGNORECASE) vmid = m.group(1) if m else '' - if any(w in text for w in ('fail', 'error', 'could not', 'unable')): + if any(w in text for w in ('fail', 'error')): return 'backup_fail', 'vm', vmid - if any(w in text for w in ('success', 'complete', 'finished', 'ok')): - return 'backup_complete', 'vm', vmid - return 'backup_start', 'vm', vmid + return 'backup_complete', 'vm', vmid - # Mail bounces - if 'could not be delivered' in text or 'mail system' in text or 'postmaster' in text: - return 'system_problem', 'node', '' - - # Disk / I/O - if any(w in text for w in ('i/o error', 'disk error', 'smart', 'bad sector', 'read error')): - return 'disk_io_error', 'disk', '' - - # Auth - if any(w in text for w in ('authentication fail', 'login fail', 'unauthorized', 'permission denied')): - return 'auth_fail', 'user', '' - - # Service failures - if any(w in text for w in ('service fail', 'failed to start', 'exited with error')): - return 'service_fail', 'node', '' - - # Replication if 'replication' in text: - if 'fail' in text or 'error' in text: - return 'replication_fail', 'vm', '' - return 'replication_complete', 'vm', '' + return 'replication_fail', 'vm', '' - # Fallback: system_problem generic + # Generic fallback return 'system_problem', 'node', '' + # Old _classify removed -- replaced by _classify_pve above. + @staticmethod def _map_severity(raw: str) -> str: raw_l = str(raw).lower() diff --git a/AppImage/scripts/notification_manager.py b/AppImage/scripts/notification_manager.py index 681cc0a3..772ce5c5 100644 --- a/AppImage/scripts/notification_manager.py +++ b/AppImage/scripts/notification_manager.py @@ -419,6 +419,14 @@ class NotificationManager: self._task_watcher = TaskWatcher(self._event_queue) self._polling_collector = PollingCollector(self._event_queue) + # Create hook_watcher eagerly so we can wire the cross-source dedup. + # TaskWatcher._webhook_delivered points at ProxmoxHookWatcher._delivered + # so TaskWatcher can skip backup/replication events the webhook already + # delivered with richer data (full logs, sizes, durations). + if not self._hook_watcher: + self._hook_watcher = ProxmoxHookWatcher(self._event_queue) + self._task_watcher._webhook_delivered = self._hook_watcher._delivered + self._journal_watcher.start() self._task_watcher.start() self._polling_collector.start() diff --git a/AppImage/scripts/notification_templates.py b/AppImage/scripts/notification_templates.py index 6b1c238c..5d1047c4 100644 --- a/AppImage/scripts/notification_templates.py +++ b/AppImage/scripts/notification_templates.py @@ -302,6 +302,12 @@ TEMPLATES = { 'group': 'system', 'default_enabled': True, }, + 'system_mail': { + 'title': '{hostname}: System mail notification', + 'body': '{reason}', + 'group': 'system', + 'default_enabled': True, + }, 'service_fail': { 'title': '{hostname}: Service failed - {service_name}', 'body': 'Service {service_name} has failed.\n{reason}',