update notification service

This commit is contained in:
MacRimi
2026-03-02 23:21:40 +01:00
parent 18c6455837
commit f0b8ed20a2
9 changed files with 298 additions and 213 deletions

View File

@@ -115,6 +115,14 @@ class JournalWatcher:
# 24h anti-cascade for disk I/O + filesystem errors (keyed by device name)
self._disk_io_notified: Dict[str, float] = {}
self._DISK_IO_COOLDOWN = 86400 # 24 hours
# Track when the last full backup job notification was sent
# so we can suppress per-guest "Starting Backup of VM ..." noise
self._last_backup_job_ts: float = 0
self._BACKUP_JOB_SUPPRESS_WINDOW = 7200 # 2h: suppress per-guest during active job
# NOTE: Service failure batching is handled universally by
# BurstAggregator in NotificationManager (AGGREGATION_RULES).
def start(self):
"""Start the journal watcher thread."""
@@ -521,42 +529,26 @@ class JournalWatcher:
match = re.search(pattern, msg)
if match:
service_name = match.group(1)
data = {
'service_name': service_name,
'reason': msg[:300],
'hostname': self._hostname,
}
display_name = service_name
# Enrich PVE VM/CT services with guest name and context
# pve-container@101 -> LXC container 101
# qemu-server@100 -> QEMU VM 100
pve_match = re.match(
r'(pve-container|qemu-server)@(\d+)', service_name)
if pve_match:
svc_type = pve_match.group(1)
vmid = pve_match.group(2)
vm_name = self._resolve_vm_name(vmid)
if svc_type == 'pve-container':
guest_type = 'LXC container'
else:
guest_type = 'QEMU VM'
display = f"{guest_type} {vmid}"
if vm_name:
display = f"{guest_type} {vmid} ({vm_name})"
data['service_name'] = service_name
data['vmid'] = vmid
data['vmname'] = vm_name
data['guest_type'] = guest_type
data['display_name'] = display
data['reason'] = (
f"{display} failed to start.\n{msg[:300]}"
)
guest_type = 'CT' if svc_type == 'pve-container' else 'VM'
display_name = f"{guest_type} {vm_name} ({vmid})" if vm_name else f"{guest_type} {vmid}"
self._emit('service_fail', 'WARNING', data,
entity='node', entity_id=service_name)
# Emit directly -- the BurstAggregator in NotificationManager
# will automatically batch multiple service failures that
# arrive within the aggregation window (90s).
self._emit('service_fail', 'WARNING', {
'service_name': display_name,
'reason': msg[:300],
'hostname': self._hostname,
}, entity='node', entity_id=service_name)
return
def _resolve_vm_name(self, vmid: str) -> str:
@@ -765,11 +757,17 @@ class JournalWatcher:
# Fallback: vzdump also emits per-guest messages like:
# "INFO: Starting Backup of VM 104 (lxc)"
# These fire for EACH guest when a multi-guest vzdump job runs.
# We only use this if the primary pattern didn't match.
# We SUPPRESS these when a full backup job was recently notified
# (within 2h window) to avoid spamming one notification per guest.
# Only use fallback for standalone single-VM backups (manual, no job).
fallback_guest = None
if not match:
fb = re.match(r'(?:INFO:\s*)?Starting Backup of VM (\d+)\s+\((lxc|qemu)\)', msg)
if fb:
# If a full job notification was sent recently, suppress per-guest noise
now = time.time()
if now - self._last_backup_job_ts < self._BACKUP_JOB_SUPPRESS_WINDOW:
return # Part of an active job -- already notified
fallback_guest = fb.group(1)
else:
return
@@ -809,16 +807,21 @@ class JournalWatcher:
if guests:
if guests == ['all']:
reason_parts.append('Guests: All VMs/CTs')
reason_parts.append('VM/CT: All')
else:
guest_lines = []
for gid in guests:
gname = self._resolve_vm_name(gid)
if gname:
guest_lines.append(f' {gname} ({gid})')
# Skip non-guest IDs (0, 1 are not real guests)
if gid in ('0', '1'):
continue
info = self._resolve_vm_info(gid)
if info:
gname, gtype = info
guest_lines.append(f' {gtype} {gname} ({gid})')
else:
guest_lines.append(f' ID {gid}')
reason_parts.append('Guests:\n' + '\n'.join(guest_lines))
if guest_lines:
reason_parts.append('VM/CT:\n' + '\n'.join(guest_lines))
details = []
if storage:
@@ -837,6 +840,11 @@ class JournalWatcher:
# dedup each other, while the SAME job doesn't fire twice.
guest_key = '_'.join(sorted(guests)) if guests else 'unknown'
# If this was a full job (primary pattern), record timestamp to
# suppress subsequent per-guest "Starting Backup of VM" messages
if match:
self._last_backup_job_ts = time.time()
self._emit('backup_start', 'INFO', {
'vmid': ', '.join(guests),
'vmname': '',
@@ -847,18 +855,33 @@ class JournalWatcher:
def _resolve_vm_name(self, vmid: str) -> str:
"""Try to resolve a VMID to its name from PVE config files."""
info = self._resolve_vm_info(vmid)
return info[0] if info else ''
def _resolve_vm_info(self, vmid: str):
"""Resolve a VMID to (name, type) from PVE config files.
Returns tuple (name, 'VM'|'CT') or None if not found.
type is determined by which config directory the ID was found in:
/etc/pve/qemu-server -> VM
/etc/pve/lxc -> CT
"""
if not vmid or not vmid.isdigit():
return ''
for base in ['/etc/pve/qemu-server', '/etc/pve/lxc']:
return None
type_map = [
('/etc/pve/qemu-server', 'VM'),
('/etc/pve/lxc', 'CT'),
]
for base, gtype in type_map:
conf = f'{base}/{vmid}.conf'
try:
with open(conf, 'r') as f:
for line in f:
if line.startswith('name:') or line.startswith('hostname:'):
return line.split(':', 1)[1].strip()
return (line.split(':', 1)[1].strip(), gtype)
except (OSError, IOError):
pass
return ''
return None
def _check_cluster_events(self, msg: str, syslog_id: str):
"""Detect cluster split-brain and node disconnect."""

View File

@@ -126,13 +126,23 @@ class GroupRateLimiter:
AGGREGATION_RULES = {
'auth_fail': {'window': 120, 'min_count': 3, 'burst_type': 'burst_auth_fail'},
'ip_block': {'window': 120, 'min_count': 3, 'burst_type': 'burst_ip_block'},
'disk_io_error': {'window': 60, 'min_count': 3, 'burst_type': 'burst_disk_io'},
'split_brain': {'window': 300, 'min_count': 2, 'burst_type': 'burst_cluster'},
'node_disconnect': {'window': 300, 'min_count': 2, 'burst_type': 'burst_cluster'},
'auth_fail': {'window': 120, 'min_count': 3, 'burst_type': 'burst_auth_fail'},
'ip_block': {'window': 120, 'min_count': 3, 'burst_type': 'burst_ip_block'},
'disk_io_error': {'window': 60, 'min_count': 3, 'burst_type': 'burst_disk_io'},
'split_brain': {'window': 300, 'min_count': 2, 'burst_type': 'burst_cluster'},
'node_disconnect': {'window': 300, 'min_count': 2, 'burst_type': 'burst_cluster'},
'service_fail': {'window': 90, 'min_count': 2, 'burst_type': 'burst_service_fail'},
'service_fail_batch': {'window': 90, 'min_count': 2, 'burst_type': 'burst_service_fail'},
'system_problem': {'window': 90, 'min_count': 2, 'burst_type': 'burst_system'},
'oom_kill': {'window': 60, 'min_count': 2, 'burst_type': 'burst_generic'},
'firewall_issue': {'window': 60, 'min_count': 2, 'burst_type': 'burst_generic'},
}
# Default catch-all rule for any event type NOT listed above.
# This ensures that even unlisted event types get grouped when they
# burst, avoiding notification floods from any source.
_DEFAULT_AGGREGATION = {'window': 60, 'min_count': 2, 'burst_type': 'burst_generic'}
class BurstAggregator:
"""Accumulates similar events in a time window, then sends a single summary.
@@ -150,11 +160,13 @@ class BurstAggregator:
def ingest(self, event: NotificationEvent) -> Optional[NotificationEvent]:
"""Add event to aggregation. Returns:
- None if event is being buffered (wait for window)
- Original event if not eligible for aggregation
- Original event if first in its bucket (sent immediately)
ALL event types are aggregated: specific rules from AGGREGATION_RULES
take priority, otherwise the _DEFAULT_AGGREGATION catch-all applies.
This prevents notification floods from any source.
"""
rule = AGGREGATION_RULES.get(event.event_type)
if not rule:
return event # Not aggregable, pass through
rule = AGGREGATION_RULES.get(event.event_type, _DEFAULT_AGGREGATION)
bucket_key = f"{event.event_type}:{event.data.get('hostname', '')}"
@@ -202,7 +214,11 @@ class BurstAggregator:
def _create_summary(self, events: List[NotificationEvent],
rule: dict) -> Optional[NotificationEvent]:
"""Create a single summary event from multiple events."""
"""Create a single summary event from multiple events.
Includes individual detail lines so the grouped message is
self-contained and the user can see exactly what happened.
"""
if not events:
return None
@@ -226,12 +242,32 @@ class BurstAggregator:
burst_type = rule.get('burst_type', 'burst_generic')
# Build detail lines from individual events.
# For each event we extract the most informative field to show
# a concise one-line summary (e.g. "- service_fail: pvestatd").
detail_lines = []
for ev in events[1:]: # Skip first (already sent individually)
line = self._summarize_event(ev)
if line:
detail_lines.append(f" - {line}")
# Cap detail lines to avoid extremely long messages
details = ''
if detail_lines:
if len(detail_lines) > 15:
shown = detail_lines[:15]
shown.append(f" ... +{len(detail_lines) - 15} more")
details = '\n'.join(shown)
else:
details = '\n'.join(detail_lines)
data = {
'hostname': first.data.get('hostname', socket.gethostname()),
'count': str(len(events)),
'window': window_str,
'entity_list': entity_list,
'event_type': first.event_type,
'details': details,
}
return NotificationEvent(
@@ -242,6 +278,37 @@ class BurstAggregator:
entity=first.entity,
entity_id='burst',
)
@staticmethod
def _summarize_event(event: NotificationEvent) -> str:
"""Extract a concise one-line summary from an event's data."""
d = event.data
etype = event.event_type
# Service failures: show service name
if etype in ('service_fail', 'service_fail_batch'):
return d.get('service_name', d.get('display_name', etype))
# System problems: first 120 chars of reason
if 'reason' in d:
reason = d['reason'].split('\n')[0][:120]
return reason
# Auth / IP: show username or IP
if 'username' in d:
return f"{etype}: {d['username']}"
if 'ip' in d:
return f"{etype}: {d['ip']}"
# VM/CT events: show vmid + name
if 'vmid' in d:
name = d.get('vmname', '')
return f"{etype}: {name} ({d['vmid']})" if name else f"{etype}: {d['vmid']}"
# Fallback: event type + entity_id
if event.entity_id:
return f"{etype}: {event.entity_id}"
return etype
# ─── Notification Manager ─────────────────────────────────────────

View File

@@ -73,6 +73,12 @@ def _parse_vzdump_message(message: str) -> Optional[Dict[str, Any]]:
filename = padded[col_starts[5]:].strip()
if vmid and vmid.isdigit():
# Infer type from filename (vzdump-lxc-NNN or vzdump-qemu-NNN)
vm_type = ''
if 'lxc' in filename:
vm_type = 'lxc'
elif 'qemu' in filename:
vm_type = 'qemu'
vms.append({
'vmid': vmid,
'name': name,
@@ -80,6 +86,7 @@ def _parse_vzdump_message(message: str) -> Optional[Dict[str, Any]]:
'time': time_val,
'size': size,
'filename': filename,
'type': vm_type,
})
# ── Strategy 2: log-style (PBS / Proxmox Backup Server) ──
@@ -235,22 +242,49 @@ def _format_vzdump_body(parsed: Dict[str, Any], is_success: bool) -> str:
status = vm.get('status', '').lower()
icon = '\u2705' if status == 'ok' else '\u274C'
parts.append(f"{icon} ID {vm['vmid']} ({vm['name']})")
# Determine VM/CT type prefix
vm_type = vm.get('type', '')
if vm_type == 'lxc':
prefix = 'CT'
elif vm_type == 'qemu':
prefix = 'VM'
else:
# Try to infer from filename (vzdump-lxc-NNN or vzdump-qemu-NNN)
fname = vm.get('filename', '')
if 'lxc' in fname or fname.startswith('ct/'):
prefix = 'CT'
elif 'qemu' in fname or fname.startswith('vm/'):
prefix = 'VM'
else:
prefix = ''
details = []
# Format: "VM Name (ID)" or "CT Name (ID)" -- name first
name = vm.get('name', '')
vmid = vm.get('vmid', '')
if prefix and name:
parts.append(f"{icon} {prefix} {name} ({vmid})")
elif name:
parts.append(f"{icon} {name} ({vmid})")
else:
parts.append(f"{icon} ID {vmid}")
# Size and Duration on same line
detail_line = []
if vm.get('size'):
details.append(f"Size: {vm['size']}")
detail_line.append(f"Size: {vm['size']}")
if vm.get('time'):
details.append(f"Duration: {vm['time']}")
detail_line.append(f"Duration: {vm['time']}")
if detail_line:
parts.append(' | '.join(detail_line))
# PBS/File on separate line
if vm.get('filename'):
fname = vm['filename']
# PBS archives look like "ct/100/2026-..." or "vm/105/2026-..."
if re.match(r'^(?:ct|vm)/\d+/', fname):
details.append(f"PBS: {fname}")
parts.append(f"PBS: {fname}")
else:
details.append(f"File: {fname}")
if details:
parts.append(' | '.join(details))
parts.append(f"File: {fname}")
parts.append('') # blank line between VMs
# Summary
@@ -583,6 +617,12 @@ TEMPLATES = {
'group': 'system',
'default_enabled': True,
},
'service_fail_batch': {
'title': '{hostname}: {service_count} services failed',
'body': '{reason}',
'group': 'system',
'default_enabled': True,
},
'system_mail': {
'title': '{hostname}: {pve_title}',
'body': '{reason}',
@@ -683,9 +723,21 @@ TEMPLATES = {
'group': 'cluster',
'default_enabled': True,
},
'burst_service_fail': {
'title': '{hostname}: {count} services failed in {window}',
'body': '{count} service failures detected in {window}.\nThis typically indicates a node reboot or PVE service restart.\n\nAdditional failures:\n{details}',
'group': 'system',
'default_enabled': True,
},
'burst_system': {
'title': '{hostname}: {count} system problems in {window}',
'body': '{count} system problems detected in {window}.\n\nAdditional issues:\n{details}',
'group': 'system',
'default_enabled': True,
},
'burst_generic': {
'title': '{hostname}: {count} {event_type} events in {window}',
'body': '{count} events of type {event_type} in {window}.\n{entity_list}',
'body': '{count} events of type {event_type} in {window}.\n\nAdditional events:\n{details}',
'group': 'system',
'default_enabled': True,
},