Update notification service

This commit is contained in:
MacRimi
2026-02-24 19:52:27 +01:00
parent 46c04e5a81
commit f7fd728683
3 changed files with 83 additions and 67 deletions

View File

@@ -280,7 +280,7 @@ class JournalWatcher:
return
def _check_service_failure(self, msg: str, unit: str):
"""Detect critical service failures."""
"""Detect critical service failures with enriched context."""
service_patterns = [
r'Failed to start (.+)',
r'Unit (\S+) (?:entered failed state|failed)',
@@ -291,13 +291,60 @@ class JournalWatcher:
match = re.search(pattern, msg)
if match:
service_name = match.group(1)
self._emit('service_fail', 'WARNING', {
data = {
'service_name': service_name,
'reason': msg[:200],
'reason': msg[:300],
'hostname': self._hostname,
}, entity='node', entity_id=service_name)
}
# Enrich PVE VM/CT services with guest name and context
# pve-container@101 -> LXC container 101
# qemu-server@100 -> QEMU VM 100
pve_match = re.match(
r'(pve-container|qemu-server)@(\d+)', service_name)
if pve_match:
svc_type = pve_match.group(1)
vmid = pve_match.group(2)
vm_name = self._resolve_vm_name(vmid)
if svc_type == 'pve-container':
guest_type = 'LXC container'
else:
guest_type = 'QEMU VM'
display = f"{guest_type} {vmid}"
if vm_name:
display = f"{guest_type} {vmid} ({vm_name})"
data['service_name'] = service_name
data['vmid'] = vmid
data['vmname'] = vm_name
data['guest_type'] = guest_type
data['display_name'] = display
data['reason'] = (
f"{display} failed to start.\n{msg[:300]}"
)
self._emit('service_fail', 'WARNING', data,
entity='node', entity_id=service_name)
return
def _resolve_vm_name(self, vmid: str) -> str:
"""Try to resolve VMID to a guest name from PVE config files."""
if not vmid:
return ''
# Check QEMU configs
for base in ['/etc/pve/qemu-server', '/etc/pve/lxc']:
conf = os.path.join(base, f'{vmid}.conf')
try:
with open(conf) as f:
for line in f:
if line.startswith('hostname:') or line.startswith('name:'):
return line.split(':', 1)[1].strip()
except (OSError, IOError):
continue
return ''
def _check_disk_io(self, msg: str, syslog_id: str, priority: int):
"""Detect disk I/O errors from kernel messages."""
if syslog_id != 'kernel' and priority > 3:
@@ -457,9 +504,6 @@ class TaskWatcher:
self._thread: Optional[threading.Thread] = None
self._hostname = _hostname()
self._last_position = 0
# Set by NotificationManager to point at ProxmoxHookWatcher._delivered
# so we can skip events the webhook already delivered with richer data.
self._webhook_delivered: Optional[dict] = None
def start(self):
if self._running:
@@ -575,22 +619,13 @@ class TaskWatcher:
# Determine entity type from task type
entity = 'ct' if task_type.startswith('vz') else 'vm'
# ── Cross-source dedup: yield to PVE webhook for backup/replication ──
# The webhook delivers richer data (full logs, sizes, durations).
# If the webhook already delivered this event within 120s, skip.
# For backup events, PVE sends ONE webhook for the entire vzdump job
# (covering all VMs), while TaskWatcher sees individual per-VM tasks.
# So we check by event_type ONLY (no VMID) -- if ANY backup_complete
# arrived from webhook recently, skip ALL backup_complete from tasks.
_WEBHOOK_TYPES = {'backup_complete', 'backup_fail', 'backup_start',
'replication_complete', 'replication_fail'}
if event_type in _WEBHOOK_TYPES and self._webhook_delivered:
import time as _time
# Check type-only key first (covers multi-VM jobs)
type_key = f"{event_type}:"
for dkey, dtime in self._webhook_delivered.items():
if dkey.startswith(type_key) and (_time.time() - dtime) < 120:
return # Webhook already delivered this with richer data
# Backup and replication events are handled EXCLUSIVELY by the PVE
# webhook, which delivers much richer data (full logs, sizes, durations,
# filenames). TaskWatcher skips these entirely to avoid duplicates.
_WEBHOOK_EXCLUSIVE = {'backup_complete', 'backup_fail', 'backup_start',
'replication_complete', 'replication_fail'}
if event_type in _WEBHOOK_EXCLUSIVE:
return
self._queue.put(NotificationEvent(
event_type, severity, data, source='tasks',
@@ -1029,18 +1064,6 @@ class ProxmoxHookWatcher:
if dur_m:
data['duration'] = dur_m.group(1).strip()
# Record this event for cross-source dedup.
# TaskWatcher iterates this dict checking if any key with the same
# event_type prefix was delivered recently (within 120s).
import time
self._delivered[f"{event_type}:{entity_id}"] = time.time()
# Cleanup old entries (use del, NOT reassign -- TaskWatcher holds a ref)
if len(self._delivered) > 200:
cutoff = time.time() - 300
stale = [k for k, v in self._delivered.items() if v < cutoff]
for k in stale:
del self._delivered[k]
event = NotificationEvent(
event_type=event_type,
severity=severity,

View File

@@ -874,10 +874,6 @@ class NotificationManager:
"""Process incoming Proxmox webhook. Delegates to ProxmoxHookWatcher."""
if not self._hook_watcher:
self._hook_watcher = ProxmoxHookWatcher(self._event_queue)
# Share the webhook's delivery record with TaskWatcher
# so tasks can yield to richer webhook data for backup/replication.
if self._task_watcher:
self._task_watcher._webhook_delivered = self._hook_watcher._delivered
return self._hook_watcher.process_webhook(payload)
def get_webhook_secret(self) -> str:

View File

@@ -103,36 +103,40 @@ def _format_vzdump_body(parsed: Dict[str, Any], is_success: bool) -> str:
for vm in parsed.get('vms', []):
status = vm.get('status', '').lower()
if status == 'ok':
icon = '\u2705' # green check
else:
icon = '\u274C' # red X
icon = '\u2705' if status == 'ok' else '\u274C'
vm_line = f"{icon} ID {vm['vmid']} ({vm['name']})"
parts.append(vm_line)
parts.append(f"{icon} ID {vm['vmid']} ({vm['name']})")
details = []
if vm.get('size'):
parts.append(f" Size: {vm['size']}")
details.append(f"Size: {vm['size']}")
if vm.get('time'):
parts.append(f" Duration: {vm['time']}")
details.append(f"Duration: {vm['time']}")
if vm.get('filename'):
parts.append(f" File: {vm['filename']}")
details.append(f"File: {vm['filename']}")
if details:
parts.append(' | '.join(details))
parts.append('') # blank line between VMs
# Summary
vm_count = parsed.get('vm_count', 0)
if vm_count > 0 or parsed.get('total_size'):
parts.append('Summary:')
ok_count = sum(1 for v in parsed.get('vms', [])
if v.get('status', '').lower() == 'ok')
fail_count = vm_count - ok_count
summary_parts = []
if vm_count:
ok_count = sum(1 for v in parsed.get('vms', []) if v.get('status', '').lower() == 'ok')
fail_count = vm_count - ok_count
parts.append(f" Total: {vm_count} backup(s)")
if fail_count:
parts.append(f" Failed: {fail_count}")
summary_parts.append(f"{vm_count} backup(s)")
if fail_count:
summary_parts.append(f"{fail_count} failed")
if parsed.get('total_size'):
parts.append(f" Total size: {parsed['total_size']}")
summary_parts.append(f"Total: {parsed['total_size']}")
if parsed.get('total_time'):
parts.append(f" Total time: {parsed['total_time']}")
summary_parts.append(f"Time: {parsed['total_time']}")
if summary_parts:
parts.append('--- ' + ' | '.join(summary_parts))
return '\n'.join(parts)
@@ -422,7 +426,7 @@ TEMPLATES = {
},
'service_fail': {
'title': '{hostname}: Service failed - {service_name}',
'body': 'Service {service_name} has failed.\n{reason}',
'body': '{reason}',
'group': 'system',
'default_enabled': True,
},
@@ -619,16 +623,9 @@ def render_template(event_type: str, data: Dict[str, Any]) -> Dict[str, Any]:
except (KeyError, ValueError):
body_text = template['body']
# Clean up: remove empty lines and consecutive duplicate lines
cleaned_lines = []
for line in body_text.split('\n'):
stripped = line.strip()
if not stripped:
continue
if cleaned_lines and stripped == cleaned_lines[-1]:
continue # skip consecutive duplicate
cleaned_lines.append(stripped)
body_text = '\n'.join(cleaned_lines)
# Clean up: collapse runs of 3+ blank lines into 1, remove trailing whitespace
import re as _re
body_text = _re.sub(r'\n{3,}', '\n\n', body_text.strip())
severity = variables.get('severity', 'INFO')
group = template.get('group', 'system')