Update notification service

This commit is contained in:
MacRimi
2026-02-24 17:55:03 +01:00
parent 507f769357
commit 4182af75ff
4 changed files with 131 additions and 228 deletions

View File

@@ -232,7 +232,7 @@ def _pve_remove_our_blocks(text, headers_to_remove):
def _build_webhook_fallback():
"""Build fallback manual commands for webhook setup."""
import base64
body_tpl = '{"title":"{{ title }}","message":"{{ message }}","severity":"{{ severity }}","timestamp":"{{ timestamp }}"}'
body_tpl = '{"title":"{{ escape title }}","message":"{{ escape message }}","severity":"{{ severity }}","timestamp":"{{ timestamp }}","fields":{{ json fields }}}'
body_b64 = base64.b64encode(body_tpl.encode()).decode()
return [
"# 1. Append to END of /etc/pve/notifications.cfg",
@@ -310,15 +310,13 @@ def setup_pve_webhook_core() -> dict:
# PVE secret format is: secret name=key,value=<base64>
# Neither is needed for localhost calls.
# PVE stores body as base64 in the config file. We encode it here
# so the config parser reads it correctly.
# Only use fields that ALWAYS exist: title, message, severity, timestamp.
# "type" and "hostname" are inside "fields" (not top-level) and
# {{ escape X }} fails hard on undefined fields, breaking the
# entire notification delivery for ALL targets.
# Our _classify() extracts type/hostname from the message content.
# PVE stores body as base64 in the config file.
# {{ escape title/message }} -- JSON-safe escaping of quotes/newlines.
# {{ json fields }} -- renders ALL PVE metadata as a JSON object
# (type, hostname, job-id). This is a single Handlebars helper
# that always works, even if fields is empty (renders {}).
import base64
body_template = '{"title":"{{ title }}","message":"{{ message }}","severity":"{{ severity }}","timestamp":"{{ timestamp }}"}'
body_template = '{"title":"{{ escape title }}","message":"{{ escape message }}","severity":"{{ severity }}","timestamp":"{{ timestamp }}","fields":{{ json fields }}}'
body_b64 = base64.b64encode(body_template.encode()).decode()
endpoint_block = (

View File

@@ -457,6 +457,9 @@ class TaskWatcher:
self._thread: Optional[threading.Thread] = None
self._hostname = _hostname()
self._last_position = 0
# Set by NotificationManager to point at ProxmoxHookWatcher._delivered
# so we can skip events the webhook already delivered with richer data.
self._webhook_delivered: Optional[dict] = None
def start(self):
if self._running:
@@ -571,6 +574,19 @@ class TaskWatcher:
# Determine entity type from task type
entity = 'ct' if task_type.startswith('vz') else 'vm'
# ── Cross-source dedup: yield to PVE webhook for backup/replication ──
# The webhook delivers richer data (full logs, sizes, durations).
# If the webhook already delivered this event within 120s, skip.
_WEBHOOK_TYPES = {'backup_complete', 'backup_fail', 'backup_start',
'replication_complete', 'replication_fail'}
if event_type in _WEBHOOK_TYPES and self._webhook_delivered:
import time as _time
dedup_key = f"{event_type}:{vmid}"
last_webhook = self._webhook_delivered.get(dedup_key, 0)
if _time.time() - last_webhook < 120:
return # Webhook already delivered this with richer data
self._queue.put(NotificationEvent(
event_type, severity, data, source='tasks',
entity=entity, entity_id=vmid,
@@ -925,111 +941,100 @@ class ProxmoxHookWatcher:
def __init__(self, event_queue: Queue):
self._queue = event_queue
self._hostname = _hostname()
# Shared dict: TaskWatcher._webhook_delivered points here.
# Records (event_type:entity_id -> timestamp) for cross-source dedup.
self._delivered: Dict[str, float] = {}
def process_webhook(self, payload: dict) -> dict:
"""Process an incoming Proxmox webhook payload.
The PVE webhook is the PRIMARY source for vzdump, replication,
fencing, package-updates and system-mail events. PVE sends rich
detail (full logs, sizes, durations) that TaskWatcher cannot match.
Body template delivers:
{title, message, severity, timestamp, fields: {type, hostname, job-id}}
Returns: {'accepted': bool, 'event_type': str, 'event_id': str}
or {'accepted': False, 'error': str}
"""
if not payload:
return {'accepted': False, 'error': 'Empty payload'}
# ── Normalise PVE native webhook format ──
# PVE sends: {title, message, severity, timestamp, fields: {type, hostname, job-id}}
# Our code expects: {type, severity, title, body, component, ...}
# Flatten `fields` into the top-level payload so _classify sees them.
if 'fields' in payload and isinstance(payload['fields'], dict):
fields = payload['fields']
# Map PVE field names to our expected names
if 'type' in fields and 'type' not in payload:
payload['type'] = fields['type'] # vzdump, fencing, replication, etc.
if 'hostname' in fields:
payload['hostname'] = fields['hostname']
if 'job-id' in fields:
payload['job_id'] = fields['job-id']
# Merge remaining fields
for k, v in fields.items():
if k not in payload:
payload[k] = v
# ── Extract structured PVE fields ──
fields = payload.get('fields') or {}
if isinstance(fields, str):
# Edge case: {{ json fields }} rendered as string instead of dict
try:
import json
fields = json.loads(fields)
except (json.JSONDecodeError, ValueError):
fields = {}
# PVE uses 'message' for the body text
if 'message' in payload and 'body' not in payload:
payload['body'] = payload['message']
pve_type = fields.get('type', '').lower().strip()
pve_hostname = fields.get('hostname', self._hostname)
pve_job_id = fields.get('job-id', '')
# Extract common fields from PVE notification payload
notification_type = payload.get('type', payload.get('notification-type', ''))
severity_raw = payload.get('severity', payload.get('priority', 'info'))
title = payload.get('title', payload.get('subject', ''))
body = payload.get('body', payload.get('message', ''))
source_component = payload.get('component', payload.get('source', payload.get('type', '')))
title = payload.get('title', '')
message = payload.get('message', payload.get('body', ''))
severity_raw = payload.get('severity', 'info').lower().strip()
timestamp = payload.get('timestamp', '')
# If 'type' is already a known template key, use it directly.
# This allows tests and internal callers to inject events by exact type
# without relying on _classify's keyword heuristics.
from notification_templates import TEMPLATES as _TMPL
if notification_type in _TMPL:
event_type = notification_type
entity = payload.get('entity', 'node')
entity_id = payload.get('entity_id', payload.get('vmid', ''))
else:
# Map to our event taxonomy via heuristic classification
event_type, entity, entity_id = self._classify(
notification_type, source_component, title, body, payload
)
# ── Classify by PVE type (direct, no heuristics needed) ──
import re
event_type, entity, entity_id = self._classify_pve(
pve_type, severity_raw, title, message
)
# Discard meta-events (overall status changes, update status, etc.)
# Discard meta-events
if event_type == '_skip':
return {'accepted': False, 'skipped': True, 'reason': 'Meta-event filtered'}
severity = self._map_severity(severity_raw)
# Extract "reason" as extra detail, NOT the full body.
# Templates already have their own intro text (e.g. "{vmname} has failed.").
# The body from the webhook often starts with the same intro, so using the
# full body as {reason} causes duplication. We strip the first line (which
# is typically the title/summary) and keep only the extra detail lines.
reason = ''
if body:
body_lines = body.strip().split('\n')
# If more than 1 line, skip the first (summary) and use the rest
if len(body_lines) > 1:
reason = '\n'.join(body_lines[1:]).strip()[:500]
else:
# Single-line body: only use it as reason if it differs from title
if body.strip().lower() != (title or '').strip().lower():
reason = body.strip()[:500]
# ── Build rich data dict ──
# For webhook events, PVE's `message` IS the notification body.
# It contains full vzdump logs, package lists, error details, etc.
# We pass it as 'pve_message' so templates can use it directly.
data = {
'hostname': self._hostname,
'reason': reason,
'hostname': pve_hostname,
'pve_type': pve_type,
'pve_message': message,
'title': title,
'source_component': source_component,
'notification_type': notification_type,
'job_id': pve_job_id,
}
# Merge ALL extra fields from payload into data so template
# variables ({vmid}, {vmname}, {count}, {source_ip}, etc.) resolve.
_reserved = {'type', 'severity', 'priority', 'title', 'subject',
'body', 'message', 'component', 'source'}
for key, val in payload.items():
if key not in _reserved and key not in data:
data[key] = str(val) if val is not None else ''
# Skip event types that the TaskWatcher already captures with
# full details (VMID, VM name, size, duration, etc.).
# The PVE webhook sends inferior data for these (no VMID, no name).
# Only forward events that tasks/journal do NOT catch reliably.
_TASKS_HANDLED = {
'backup_complete', 'backup_fail', 'backup_start',
'snapshot_complete', 'snapshot_fail',
'vm_start', 'vm_stop', 'vm_restart', 'vm_shutdown',
'ct_start', 'ct_stop',
'migration_start', 'migration_complete', 'migration_fail',
'replication_complete', 'replication_fail',
}
if event_type in _TASKS_HANDLED:
return {'accepted': False, 'skipped': True,
'reason': f'tasks_watcher_handles_{event_type}'}
# Extract VMID and VM name from message for vzdump events
if pve_type == 'vzdump' and message:
# PVE vzdump messages contain lines like:
# "INFO: Starting Backup of VM 100 (qemu)"
# "VMID Name Status Time Size Filename"
# "100 arch-linux OK 00:05:30 1.2G /path/to/file"
vmids = re.findall(r'(?:VM|CT)\s+(\d+)', message, re.IGNORECASE)
if vmids:
data['vmid'] = vmids[0]
entity_id = vmids[0]
# Try to extract VM name from the table line
name_m = re.search(r'(\d+)\s+(\S+)\s+(?:OK|ERROR|WARNINGS)', message)
if name_m:
data['vmname'] = name_m.group(2)
# Extract size from "Total size: X"
size_m = re.search(r'Total size:\s*(.+?)(?:\n|$)', message)
if size_m:
data['size'] = size_m.group(1).strip()
# Extract duration from "Total running time: X"
dur_m = re.search(r'Total running time:\s*(.+?)(?:\n|$)', message)
if dur_m:
data['duration'] = dur_m.group(1).strip()
# Record in the shared delivered dict so TaskWatcher can dedup.
# TaskWatcher._webhook_delivered points at self._delivered (same object).
self._delivered[f"{event_type}:{entity_id}"] = time.time()
# Prune old entries periodically
if len(self._delivered) > 200:
cutoff = time.time() - 300
to_del = [k for k, v in self._delivered.items() if v < cutoff]
for k in to_del:
del self._delivered[k]
event = NotificationEvent(
event_type=event_type,
@@ -1044,50 +1049,28 @@ class ProxmoxHookWatcher:
self._queue.put(event)
return {'accepted': True, 'event_type': event_type, 'event_id': event.event_id}
def _classify(self, ntype: str, component: str, title: str,
body: str, payload: dict) -> tuple:
"""Classify webhook payload into (event_type, entity, entity_id).
def _classify_pve(self, pve_type: str, severity: str,
title: str, message: str) -> tuple:
"""Classify using PVE's structured fields.type.
Returns ('_skip', '', '') for meta-events we should discard.
Returns (event_type, entity, entity_id).
"""
title_lower = (title or '').lower()
body_lower = (body or '').lower()
component_lower = (component or '').lower()
# ── Skip PVE meta-events ──
# PVE sends "overall status changed from OK to WARNING" which is a meta
# aggregation event. Our own health monitor handles the underlying issues
# with better granularity, so we skip these to avoid noise/duplicates.
# Skip overall/updates status change meta-events
if 'overall' in title_lower and ('changed' in title_lower or 'status' in title_lower):
return '_skip', '', ''
# ── Skip "updates changed" status events ──
# PVE sends "updates status changed from OK to WARNING" when apt updates
# are available. Our PollingCollector already handles update checks with
# proper detail (security count, package list) on a 24h cycle.
if 'updates' in title_lower and ('changed' in title_lower or 'status' in title_lower):
return '_skip', '', ''
# ── PVE native notification types ──
# When PVE sends via our webhook body template, fields.type is one of:
# vzdump, fencing, replication, package-updates, system-mail
pve_type = payload.get('type', '').lower().strip()
# ── Direct classification by PVE type ──
if pve_type == 'vzdump':
# Backup notification -- determine success or failure from severity
pve_sev = payload.get('severity', 'info').lower()
vmid = ''
# Try to extract VMID from title like "Backup of VM 100 (qemu)"
import re
m = re.search(r'VM\s+(\d+)|CT\s+(\d+)', title, re.IGNORECASE)
if m:
vmid = m.group(1) or m.group(2) or ''
if pve_sev == 'error':
return 'backup_fail', 'vm', vmid
return 'backup_complete', 'vm', vmid
if severity in ('error', 'err'):
return 'backup_fail', 'vm', ''
return 'backup_complete', 'vm', ''
if pve_type == 'fencing':
return 'split_brain', 'node', payload.get('hostname', '')
return 'split_brain', 'node', ''
if pve_type == 'replication':
return 'replication_fail', 'vm', ''
@@ -1096,121 +1079,29 @@ class ProxmoxHookWatcher:
return 'update_available', 'node', ''
if pve_type == 'system-mail':
# Forwarded system mail (e.g. from smartd) -- treat as system_problem
return 'system_problem', 'node', ''
return 'system_mail', 'node', ''
# VM / CT lifecycle events (if sent via webhook)
vmid = str(payload.get('vmid', ''))
if any(k in component_lower for k in ('qemu', 'lxc', 'vm', 'ct', 'container')):
if any(w in title_lower for w in ('start', 'running')):
etype = 'ct_start' if 'lxc' in component_lower or 'container' in component_lower else 'vm_start'
return etype, 'vm', vmid
if any(w in title_lower for w in ('stop', 'shutdown', 'down')):
etype = 'ct_stop' if 'lxc' in component_lower or 'container' in component_lower else 'vm_stop'
return etype, 'vm', vmid
if 'fail' in title_lower or 'crash' in title_lower or 'error' in body_lower:
return 'vm_fail', 'vm', vmid
if 'migrat' in title_lower:
return 'migration_complete', 'vm', vmid
return 'vm_start', 'vm', vmid
# ── Fallback for unknown/empty pve_type ──
# (e.g. test notifications, future PVE event types)
msg_lower = (message or '').lower()
text = f"{title_lower} {msg_lower}"
# Also check title for VM keywords if component is not specific
if vmid or any(k in title_lower for k in ('vm ', 'ct ', 'qemu', 'lxc')):
if 'start' in title_lower:
return 'vm_start', 'vm', vmid
if any(w in title_lower for w in ('stop', 'shutdown')):
return 'vm_stop', 'vm', vmid
if 'fail' in title_lower or 'crash' in title_lower:
return 'vm_fail', 'vm', vmid
# Storage / SMART / ZFS / Ceph
if any(k in component_lower for k in ('smart', 'disk', 'zfs', 'ceph')):
entity_id = payload.get('device', payload.get('pool', ''))
if 'smart' in title_lower or 'smart' in body_lower:
return 'disk_io_error', 'disk', str(entity_id)
if 'zfs' in title_lower:
return 'disk_io_error', 'storage', str(entity_id)
return 'disk_space_low', 'storage', str(entity_id)
# Replication
if 'replication' in component_lower or 'replication' in title_lower:
vmid = str(payload.get('vmid', ''))
if 'fail' in title_lower or 'error' in body_lower:
return 'replication_fail', 'vm', vmid
return 'replication_complete', 'vm', vmid
# PBS (Proxmox Backup Server)
if 'pbs' in component_lower or 'backup' in component_lower:
vmid = str(payload.get('vmid', ''))
if 'fail' in title_lower or 'error' in body_lower:
return 'backup_fail', 'vm', vmid
if 'complete' in title_lower or 'success' in body_lower:
return 'backup_complete', 'vm', vmid
return 'backup_start', 'vm', vmid
# Cluster / HA / Fencing / Corosync
if any(k in component_lower for k in ('cluster', 'ha', 'fencing', 'corosync')):
node = str(payload.get('node', ''))
if 'quorum' in title_lower or 'split' in body_lower:
return 'split_brain', 'cluster', node
if 'fencing' in title_lower:
return 'node_disconnect', 'cluster', node
return 'node_disconnect', 'cluster', node
# APT / Updates
if 'apt' in component_lower or 'update' in title_lower:
return 'update_available', 'node', ''
# Network
if 'network' in component_lower:
return 'network_down', 'network', ''
# Security -- distinguish firewall from auth
if 'firewall' in component_lower or 'firewall' in title_lower:
return 'firewall_issue', 'node', ''
if any(k in component_lower for k in ('auth', 'security', 'pam', 'sshd')):
return 'auth_fail', 'user', ''
# ── Text-based heuristics (for proxmox_hook where component is empty) ──
# Scan title + body for known keywords regardless of component.
text = f"{title_lower} {body_lower}"
# Backup / vzdump
if 'vzdump' in text or 'backup' in text:
import re
m = re.search(r'(?:vm|ct|vmid)\s*(\d+)', text, re.IGNORECASE)
m = re.search(r'(?:vm|ct)\s+(\d+)', text, re.IGNORECASE)
vmid = m.group(1) if m else ''
if any(w in text for w in ('fail', 'error', 'could not', 'unable')):
if any(w in text for w in ('fail', 'error')):
return 'backup_fail', 'vm', vmid
if any(w in text for w in ('success', 'complete', 'finished', 'ok')):
return 'backup_complete', 'vm', vmid
return 'backup_start', 'vm', vmid
return 'backup_complete', 'vm', vmid
# Mail bounces
if 'could not be delivered' in text or 'mail system' in text or 'postmaster' in text:
return 'system_problem', 'node', ''
# Disk / I/O
if any(w in text for w in ('i/o error', 'disk error', 'smart', 'bad sector', 'read error')):
return 'disk_io_error', 'disk', ''
# Auth
if any(w in text for w in ('authentication fail', 'login fail', 'unauthorized', 'permission denied')):
return 'auth_fail', 'user', ''
# Service failures
if any(w in text for w in ('service fail', 'failed to start', 'exited with error')):
return 'service_fail', 'node', ''
# Replication
if 'replication' in text:
if 'fail' in text or 'error' in text:
return 'replication_fail', 'vm', ''
return 'replication_complete', 'vm', ''
return 'replication_fail', 'vm', ''
# Fallback: system_problem generic
# Generic fallback
return 'system_problem', 'node', ''
# Old _classify removed -- replaced by _classify_pve above.
@staticmethod
def _map_severity(raw: str) -> str:
raw_l = str(raw).lower()

View File

@@ -419,6 +419,14 @@ class NotificationManager:
self._task_watcher = TaskWatcher(self._event_queue)
self._polling_collector = PollingCollector(self._event_queue)
# Create hook_watcher eagerly so we can wire the cross-source dedup.
# TaskWatcher._webhook_delivered points at ProxmoxHookWatcher._delivered
# so TaskWatcher can skip backup/replication events the webhook already
# delivered with richer data (full logs, sizes, durations).
if not self._hook_watcher:
self._hook_watcher = ProxmoxHookWatcher(self._event_queue)
self._task_watcher._webhook_delivered = self._hook_watcher._delivered
self._journal_watcher.start()
self._task_watcher.start()
self._polling_collector.start()

View File

@@ -302,6 +302,12 @@ TEMPLATES = {
'group': 'system',
'default_enabled': True,
},
'system_mail': {
'title': '{hostname}: System mail notification',
'body': '{reason}',
'group': 'system',
'default_enabled': True,
},
'service_fail': {
'title': '{hostname}: Service failed - {service_name}',
'body': 'Service {service_name} has failed.\n{reason}',