diff --git a/AppImage/scripts/notification_events.py b/AppImage/scripts/notification_events.py index d24cec08..1016e1c2 100644 --- a/AppImage/scripts/notification_events.py +++ b/AppImage/scripts/notification_events.py @@ -280,7 +280,7 @@ class JournalWatcher: return def _check_service_failure(self, msg: str, unit: str): - """Detect critical service failures.""" + """Detect critical service failures with enriched context.""" service_patterns = [ r'Failed to start (.+)', r'Unit (\S+) (?:entered failed state|failed)', @@ -291,13 +291,60 @@ class JournalWatcher: match = re.search(pattern, msg) if match: service_name = match.group(1) - self._emit('service_fail', 'WARNING', { + data = { 'service_name': service_name, - 'reason': msg[:200], + 'reason': msg[:300], 'hostname': self._hostname, - }, entity='node', entity_id=service_name) + } + + # Enrich PVE VM/CT services with guest name and context + # pve-container@101 -> LXC container 101 + # qemu-server@100 -> QEMU VM 100 + pve_match = re.match( + r'(pve-container|qemu-server)@(\d+)', service_name) + if pve_match: + svc_type = pve_match.group(1) + vmid = pve_match.group(2) + vm_name = self._resolve_vm_name(vmid) + + if svc_type == 'pve-container': + guest_type = 'LXC container' + else: + guest_type = 'QEMU VM' + + display = f"{guest_type} {vmid}" + if vm_name: + display = f"{guest_type} {vmid} ({vm_name})" + + data['service_name'] = service_name + data['vmid'] = vmid + data['vmname'] = vm_name + data['guest_type'] = guest_type + data['display_name'] = display + data['reason'] = ( + f"{display} failed to start.\n{msg[:300]}" + ) + + self._emit('service_fail', 'WARNING', data, + entity='node', entity_id=service_name) return + def _resolve_vm_name(self, vmid: str) -> str: + """Try to resolve VMID to a guest name from PVE config files.""" + if not vmid: + return '' + # Check QEMU configs + for base in ['/etc/pve/qemu-server', '/etc/pve/lxc']: + conf = os.path.join(base, f'{vmid}.conf') + try: + with open(conf) as f: + for line in f: + if line.startswith('hostname:') or line.startswith('name:'): + return line.split(':', 1)[1].strip() + except (OSError, IOError): + continue + return '' + def _check_disk_io(self, msg: str, syslog_id: str, priority: int): """Detect disk I/O errors from kernel messages.""" if syslog_id != 'kernel' and priority > 3: @@ -457,9 +504,6 @@ class TaskWatcher: self._thread: Optional[threading.Thread] = None self._hostname = _hostname() self._last_position = 0 - # Set by NotificationManager to point at ProxmoxHookWatcher._delivered - # so we can skip events the webhook already delivered with richer data. - self._webhook_delivered: Optional[dict] = None def start(self): if self._running: @@ -575,22 +619,13 @@ class TaskWatcher: # Determine entity type from task type entity = 'ct' if task_type.startswith('vz') else 'vm' - # ── Cross-source dedup: yield to PVE webhook for backup/replication ── - # The webhook delivers richer data (full logs, sizes, durations). - # If the webhook already delivered this event within 120s, skip. - # For backup events, PVE sends ONE webhook for the entire vzdump job - # (covering all VMs), while TaskWatcher sees individual per-VM tasks. - # So we check by event_type ONLY (no VMID) -- if ANY backup_complete - # arrived from webhook recently, skip ALL backup_complete from tasks. - _WEBHOOK_TYPES = {'backup_complete', 'backup_fail', 'backup_start', - 'replication_complete', 'replication_fail'} - if event_type in _WEBHOOK_TYPES and self._webhook_delivered: - import time as _time - # Check type-only key first (covers multi-VM jobs) - type_key = f"{event_type}:" - for dkey, dtime in self._webhook_delivered.items(): - if dkey.startswith(type_key) and (_time.time() - dtime) < 120: - return # Webhook already delivered this with richer data + # Backup and replication events are handled EXCLUSIVELY by the PVE + # webhook, which delivers much richer data (full logs, sizes, durations, + # filenames). TaskWatcher skips these entirely to avoid duplicates. + _WEBHOOK_EXCLUSIVE = {'backup_complete', 'backup_fail', 'backup_start', + 'replication_complete', 'replication_fail'} + if event_type in _WEBHOOK_EXCLUSIVE: + return self._queue.put(NotificationEvent( event_type, severity, data, source='tasks', @@ -1029,18 +1064,6 @@ class ProxmoxHookWatcher: if dur_m: data['duration'] = dur_m.group(1).strip() - # Record this event for cross-source dedup. - # TaskWatcher iterates this dict checking if any key with the same - # event_type prefix was delivered recently (within 120s). - import time - self._delivered[f"{event_type}:{entity_id}"] = time.time() - # Cleanup old entries (use del, NOT reassign -- TaskWatcher holds a ref) - if len(self._delivered) > 200: - cutoff = time.time() - 300 - stale = [k for k, v in self._delivered.items() if v < cutoff] - for k in stale: - del self._delivered[k] - event = NotificationEvent( event_type=event_type, severity=severity, diff --git a/AppImage/scripts/notification_manager.py b/AppImage/scripts/notification_manager.py index b16b9c1e..68c10965 100644 --- a/AppImage/scripts/notification_manager.py +++ b/AppImage/scripts/notification_manager.py @@ -874,10 +874,6 @@ class NotificationManager: """Process incoming Proxmox webhook. Delegates to ProxmoxHookWatcher.""" if not self._hook_watcher: self._hook_watcher = ProxmoxHookWatcher(self._event_queue) - # Share the webhook's delivery record with TaskWatcher - # so tasks can yield to richer webhook data for backup/replication. - if self._task_watcher: - self._task_watcher._webhook_delivered = self._hook_watcher._delivered return self._hook_watcher.process_webhook(payload) def get_webhook_secret(self) -> str: diff --git a/AppImage/scripts/notification_templates.py b/AppImage/scripts/notification_templates.py index 5a5438ff..9c2c41d9 100644 --- a/AppImage/scripts/notification_templates.py +++ b/AppImage/scripts/notification_templates.py @@ -103,36 +103,40 @@ def _format_vzdump_body(parsed: Dict[str, Any], is_success: bool) -> str: for vm in parsed.get('vms', []): status = vm.get('status', '').lower() - if status == 'ok': - icon = '\u2705' # green check - else: - icon = '\u274C' # red X + icon = '\u2705' if status == 'ok' else '\u274C' - vm_line = f"{icon} ID {vm['vmid']} ({vm['name']})" - parts.append(vm_line) + parts.append(f"{icon} ID {vm['vmid']} ({vm['name']})") + details = [] if vm.get('size'): - parts.append(f" Size: {vm['size']}") + details.append(f"Size: {vm['size']}") if vm.get('time'): - parts.append(f" Duration: {vm['time']}") + details.append(f"Duration: {vm['time']}") if vm.get('filename'): - parts.append(f" File: {vm['filename']}") + details.append(f"File: {vm['filename']}") + if details: + parts.append(' | '.join(details)) parts.append('') # blank line between VMs # Summary vm_count = parsed.get('vm_count', 0) if vm_count > 0 or parsed.get('total_size'): - parts.append('Summary:') + ok_count = sum(1 for v in parsed.get('vms', []) + if v.get('status', '').lower() == 'ok') + fail_count = vm_count - ok_count + + summary_parts = [] if vm_count: - ok_count = sum(1 for v in parsed.get('vms', []) if v.get('status', '').lower() == 'ok') - fail_count = vm_count - ok_count - parts.append(f" Total: {vm_count} backup(s)") - if fail_count: - parts.append(f" Failed: {fail_count}") + summary_parts.append(f"{vm_count} backup(s)") + if fail_count: + summary_parts.append(f"{fail_count} failed") if parsed.get('total_size'): - parts.append(f" Total size: {parsed['total_size']}") + summary_parts.append(f"Total: {parsed['total_size']}") if parsed.get('total_time'): - parts.append(f" Total time: {parsed['total_time']}") + summary_parts.append(f"Time: {parsed['total_time']}") + + if summary_parts: + parts.append('--- ' + ' | '.join(summary_parts)) return '\n'.join(parts) @@ -422,7 +426,7 @@ TEMPLATES = { }, 'service_fail': { 'title': '{hostname}: Service failed - {service_name}', - 'body': 'Service {service_name} has failed.\n{reason}', + 'body': '{reason}', 'group': 'system', 'default_enabled': True, }, @@ -619,16 +623,9 @@ def render_template(event_type: str, data: Dict[str, Any]) -> Dict[str, Any]: except (KeyError, ValueError): body_text = template['body'] - # Clean up: remove empty lines and consecutive duplicate lines - cleaned_lines = [] - for line in body_text.split('\n'): - stripped = line.strip() - if not stripped: - continue - if cleaned_lines and stripped == cleaned_lines[-1]: - continue # skip consecutive duplicate - cleaned_lines.append(stripped) - body_text = '\n'.join(cleaned_lines) + # Clean up: collapse runs of 3+ blank lines into 1, remove trailing whitespace + import re as _re + body_text = _re.sub(r'\n{3,}', '\n\n', body_text.strip()) severity = variables.get('severity', 'INFO') group = template.get('group', 'system')