Files
ProxMenux/AppImage/scripts/notification_templates.py
2026-03-17 14:07:47 +01:00

1467 lines
55 KiB
Python

"""
ProxMenux Notification Templates
Message templates for all event types with per-channel formatting.
Templates use Python str.format() variables:
{hostname}, {severity}, {category}, {reason}, {summary},
{previous}, {current}, {vmid}, {vmname}, {timestamp}, etc.
Optional AI enhancement enriches messages with context/suggestions.
Author: MacRimi
"""
import json
import re
import socket
import time
import urllib.request
import urllib.error
from typing import Dict, Any, Optional, List
# ─── vzdump message parser ───────────────────────────────────────
def _parse_vzdump_message(message: str) -> Optional[Dict[str, Any]]:
"""Parse a PVE vzdump notification message into structured data.
Supports two formats:
1. Local storage: table with columns VMID Name Status Time Size Filename
2. PBS storage: log-style output with 'Finished Backup of VM NNN (HH:MM:SS)'
and sizes in lines like 'root.pxar: had to backup X of Y' or 'transferred X'
Returns dict with 'vms' list, 'total_time', 'total_size', or None.
"""
if not message:
return None
vms: List[Dict[str, str]] = []
total_time = ''
total_size = ''
lines = message.split('\n')
# ── Strategy 1: classic table (local/NFS/CIFS storage) ──
header_idx = -1
for i, line in enumerate(lines):
if re.match(r'\s*VMID\s+Name\s+Status', line, re.IGNORECASE):
header_idx = i
break
if header_idx >= 0:
# Use column positions from the header to slice each row.
# Header: "VMID Name Status Time Size Filename"
header = lines[header_idx]
col_starts = []
for col_name in ['VMID', 'Name', 'Status', 'Time', 'Size', 'Filename']:
idx = header.find(col_name)
if idx >= 0:
col_starts.append(idx)
if len(col_starts) == 6:
for line in lines[header_idx + 1:]:
stripped = line.strip()
if not stripped or stripped.startswith('Total') or stripped.startswith('Logs') or stripped.startswith('='):
break
# Pad line to avoid index errors
padded = line.ljust(col_starts[-1] + 50)
vmid = padded[col_starts[0]:col_starts[1]].strip()
name = padded[col_starts[1]:col_starts[2]].strip()
status = padded[col_starts[2]:col_starts[3]].strip()
time_val = padded[col_starts[3]:col_starts[4]].strip()
size = padded[col_starts[4]:col_starts[5]].strip()
filename = padded[col_starts[5]:].strip()
if vmid and vmid.isdigit():
# Infer type from filename (vzdump-lxc-NNN or vzdump-qemu-NNN)
vm_type = ''
if 'lxc' in filename:
vm_type = 'lxc'
elif 'qemu' in filename:
vm_type = 'qemu'
vms.append({
'vmid': vmid,
'name': name,
'status': status,
'time': time_val,
'size': size,
'filename': filename,
'type': vm_type,
})
# ── Strategy 2: log-style (PBS / Proxmox Backup Server) ──
# Parse from the full vzdump log lines.
# Look for patterns:
# "Starting Backup of VM NNN (lxc/qemu)" -> detect guest
# "CT Name: xxx" or "VM Name: xxx" -> guest name
# "Finished Backup of VM NNN (HH:MM:SS)" -> duration + status=ok
# "root.pxar: had to backup X of Y" -> size (CT)
# "transferred X in N seconds" -> size (QEMU)
# "creating ... archive 'ct/100/2026-..'" -> archive name for PBS
# "TASK ERROR:" or "ERROR:" -> status=error
if not vms:
current_vm: Optional[Dict[str, str]] = None
for line in lines:
# Remove "INFO: " prefix that PVE adds
clean = re.sub(r'^(?:INFO|WARNING|ERROR):\s*', '', line.strip())
# Start of a new VM backup
m_start = re.match(
r'Starting Backup of VM (\d+)\s+\((lxc|qemu)\)', clean)
if m_start:
if current_vm:
vms.append(current_vm)
current_vm = {
'vmid': m_start.group(1),
'name': '',
'status': 'ok',
'time': '',
'size': '',
'filename': '',
'type': m_start.group(2),
}
continue
if current_vm:
# Guest name
m_name = re.match(r'(?:CT|VM) Name:\s*(.+)', clean)
if m_name:
current_vm['name'] = m_name.group(1).strip()
continue
# PBS archive path -> extract as filename
m_archive = re.search(
r"creating .+ archive '([^']+)'", clean)
if m_archive:
current_vm['filename'] = m_archive.group(1)
continue
# Size for containers (pxar)
m_pxar = re.search(
r'root\.pxar:.*?of\s+([\d.]+\s+\S+)', clean)
if m_pxar:
current_vm['size'] = m_pxar.group(1)
continue
# Size for QEMU (transferred)
m_transfer = re.search(
r'transferred\s+([\d.]+\s+\S+)', clean)
if m_transfer:
current_vm['size'] = m_transfer.group(1)
continue
# Finished -> duration
m_finish = re.match(
r'Finished Backup of VM (\d+)\s+\(([^)]+)\)', clean)
if m_finish:
current_vm['time'] = m_finish.group(2)
current_vm['status'] = 'ok'
vms.append(current_vm)
current_vm = None
continue
# Error
if clean.startswith('ERROR:') or clean.startswith('TASK ERROR'):
if current_vm:
current_vm['status'] = 'error'
# Don't forget the last VM if it wasn't finished
if current_vm:
vms.append(current_vm)
# ── Extract totals ──
for line in lines:
m_time = re.search(r'Total running time:\s*(.+)', line)
if m_time:
total_time = m_time.group(1).strip()
m_size = re.search(r'Total size:\s*(.+)', line)
if m_size:
total_size = m_size.group(1).strip()
# For PBS: calculate total size if not explicitly stated
if not total_size and vms:
# Sum individual sizes if they share units
sizes_gib = 0.0
for vm in vms:
s = vm.get('size', '')
m = re.match(r'([\d.]+)\s+(.*)', s)
if m:
val = float(m.group(1))
unit = m.group(2).strip().upper()
if 'GIB' in unit or 'GB' in unit:
sizes_gib += val
elif 'MIB' in unit or 'MB' in unit:
sizes_gib += val / 1024
elif 'TIB' in unit or 'TB' in unit:
sizes_gib += val * 1024
if sizes_gib > 0:
if sizes_gib >= 1024:
total_size = f"{sizes_gib / 1024:.3f} TiB"
elif sizes_gib >= 1:
total_size = f"{sizes_gib:.3f} GiB"
else:
total_size = f"{sizes_gib * 1024:.3f} MiB"
# For PBS: calculate total time if not stated
if not total_time and vms:
total_secs = 0
for vm in vms:
t = vm.get('time', '')
# Parse HH:MM:SS format
m = re.match(r'(\d+):(\d+):(\d+)', t)
if m:
total_secs += int(m.group(1)) * 3600 + int(m.group(2)) * 60 + int(m.group(3))
if total_secs > 0:
hours = total_secs // 3600
mins = (total_secs % 3600) // 60
secs = total_secs % 60
if hours:
total_time = f"{hours}h {mins}m {secs}s"
elif mins:
total_time = f"{mins}m {secs}s"
else:
total_time = f"{secs}s"
if not vms and not total_size:
return None
return {
'vms': vms,
'total_time': total_time,
'total_size': total_size,
'vm_count': len(vms),
}
def _format_vzdump_body(parsed: Dict[str, Any], is_success: bool) -> str:
"""Format parsed vzdump data into a clean Telegram-friendly message."""
parts = []
for vm in parsed.get('vms', []):
status = vm.get('status', '').lower()
icon = '\u2705' if status == 'ok' else '\u274C'
# Determine VM/CT type prefix
vm_type = vm.get('type', '')
if vm_type == 'lxc':
prefix = 'CT'
elif vm_type == 'qemu':
prefix = 'VM'
else:
# Try to infer from filename (vzdump-lxc-NNN or vzdump-qemu-NNN)
fname = vm.get('filename', '')
if 'lxc' in fname or fname.startswith('ct/'):
prefix = 'CT'
elif 'qemu' in fname or fname.startswith('vm/'):
prefix = 'VM'
else:
prefix = ''
# Format: "VM Name (ID)" or "CT Name (ID)" -- name first
name = vm.get('name', '')
vmid = vm.get('vmid', '')
if prefix and name:
parts.append(f"{icon} {prefix} {name} ({vmid})")
elif name:
parts.append(f"{icon} {name} ({vmid})")
else:
parts.append(f"{icon} ID {vmid}")
# Size and Duration on same line with icons
detail_line = []
if vm.get('size'):
detail_line.append(f"\U0001F4CF Size: {vm['size']}")
if vm.get('time'):
detail_line.append(f"\u23F1\uFE0F Duration: {vm['time']}")
if detail_line:
parts.append(' | '.join(detail_line))
# PBS/File on separate line with icon
if vm.get('filename'):
fname = vm['filename']
if re.match(r'^(?:ct|vm)/\d+/', fname):
parts.append(f"\U0001F5C4\uFE0F PBS: {fname}")
else:
parts.append(f"\U0001F4C1 File: {fname}")
# Error reason if failed
if status != 'ok' and vm.get('error'):
parts.append(f"\u26A0\uFE0F {vm['error']}")
parts.append('') # blank line between VMs
# Summary line with icons
vm_count = parsed.get('vm_count', 0)
if vm_count > 0 or parsed.get('total_size'):
ok_count = sum(1 for v in parsed.get('vms', [])
if v.get('status', '').lower() == 'ok')
fail_count = vm_count - ok_count
summary_parts = []
if vm_count:
summary_parts.append(f"\U0001F4CA {vm_count} backups")
if fail_count:
summary_parts.append(f"\u274C {fail_count} failed")
if parsed.get('total_size'):
summary_parts.append(f"\U0001F4E6 Total: {parsed['total_size']}")
if parsed.get('total_time'):
summary_parts.append(f"\u23F1\uFE0F Time: {parsed['total_time']}")
if summary_parts:
parts.append(' | '.join(summary_parts))
return '\n'.join(parts)
# ─── Severity Icons ──────────────────────────────────────────────
SEVERITY_ICONS = {
'CRITICAL': '\U0001F534',
'WARNING': '\U0001F7E1',
'INFO': '\U0001F535',
'OK': '\U0001F7E2',
'UNKNOWN': '\u26AA',
}
SEVERITY_ICONS_DISCORD = {
'CRITICAL': ':red_circle:',
'WARNING': ':yellow_circle:',
'INFO': ':blue_circle:',
'OK': ':green_circle:',
'UNKNOWN': ':white_circle:',
}
# ─── Event Templates ─────────────────────────────────────────────
# Each template has a 'title' and 'body' with {variable} placeholders.
# 'group' is used for UI event filter grouping.
# 'default_enabled' controls initial state in settings.
TEMPLATES = {
# ── Health Monitor state changes ──
# NOTE: state_change is disabled by default -- it fires on every
# status oscillation (OK->WARNING->OK) which creates noise.
# The health_persistent and new_error templates cover this better.
'state_change': {
'title': '{hostname}: {category} changed to {current}',
'body': '{category} status changed from {previous} to {current}.\n{reason}',
'label': 'Health state changed',
'group': 'health',
'default_enabled': False,
},
'new_error': {
'title': '{hostname}: New {severity} - {category}',
'body': '{reason}',
'label': 'New health issue',
'group': 'health',
'default_enabled': True,
},
'error_resolved': {
'title': '{hostname}: Resolved - {category}',
'body': 'The {category} issue has been resolved.\n{reason}\n\U0001F6A6 Previous severity: {original_severity}\n\u23F1\uFE0F Duration: {duration}',
'label': 'Recovery notification',
'group': 'health',
'default_enabled': True,
},
'error_escalated': {
'title': '{hostname}: Escalated to {severity} - {category}',
'body': '{reason}',
'label': 'Health issue escalated',
'group': 'health',
'default_enabled': True,
},
'health_degraded': {
'title': '{hostname}: Health check degraded',
'body': '{reason}',
'label': 'Health check degraded',
'group': 'health',
'default_enabled': True,
},
# ── VM / CT events ──
'vm_start': {
'title': '{hostname}: VM {vmid} started',
'body': '{vmname} ({vmid}) has been started.',
'label': 'VM started',
'group': 'vm_ct',
'default_enabled': True,
},
'vm_stop': {
'title': '{hostname}: VM {vmid} stopped',
'body': '{vmname} ({vmid}) has been stopped.',
'label': 'VM stopped',
'group': 'vm_ct',
'default_enabled': False,
},
'vm_shutdown': {
'title': '{hostname}: VM {vmid} shutdown',
'body': '{vmname} ({vmid}) has been shut down.',
'label': 'VM shutdown',
'group': 'vm_ct',
'default_enabled': False,
},
'vm_fail': {
'title': '{hostname}: VM {vmid} FAILED',
'body': '{vmname} ({vmid}) has failed.\n{reason}',
'label': 'VM FAILED',
'group': 'vm_ct',
'default_enabled': True,
},
'vm_restart': {
'title': '{hostname}: VM {vmid} restarted',
'body': '{vmname} ({vmid}) has been restarted.',
'label': 'VM restarted',
'group': 'vm_ct',
'default_enabled': False,
},
'ct_start': {
'title': '{hostname}: CT {vmid} started',
'body': '{vmname} ({vmid}) has been started.',
'label': 'CT started',
'group': 'vm_ct',
'default_enabled': True,
},
'ct_stop': {
'title': '{hostname}: CT {vmid} stopped',
'body': '{vmname} ({vmid}) has been stopped.',
'label': 'CT stopped',
'group': 'vm_ct',
'default_enabled': False,
},
'ct_shutdown': {
'title': '{hostname}: CT {vmid} shutdown',
'body': '{vmname} ({vmid}) has been shut down.',
'label': 'CT shutdown',
'group': 'vm_ct',
'default_enabled': False,
},
'ct_restart': {
'title': '{hostname}: CT {vmid} restarted',
'body': '{vmname} ({vmid}) has been restarted.',
'label': 'CT restarted',
'group': 'vm_ct',
'default_enabled': False,
},
'ct_fail': {
'title': '{hostname}: CT {vmid} FAILED',
'body': '{vmname} ({vmid}) has failed.\n{reason}',
'label': 'CT FAILED',
'group': 'vm_ct',
'default_enabled': True,
},
'migration_start': {
'title': '{hostname}: Migration started - {vmid}',
'body': '{vmname} ({vmid}) migration to {target_node} started.',
'label': 'Migration started',
'group': 'vm_ct',
'default_enabled': True,
},
'migration_complete': {
'title': '{hostname}: Migration complete - {vmid}',
'body': '{vmname} ({vmid}) migrated successfully to {target_node}.',
'label': 'Migration complete',
'group': 'vm_ct',
'default_enabled': True,
},
'migration_fail': {
'title': '{hostname}: Migration FAILED - {vmid}',
'body': '{vmname} ({vmid}) migration to {target_node} failed.\n{reason}',
'label': 'Migration FAILED',
'group': 'vm_ct',
'default_enabled': True,
},
'replication_fail': {
'title': '{hostname}: Replication FAILED - {vmid}',
'body': 'Replication of {vmname} ({vmid}) has failed.\n{reason}',
'label': 'Replication FAILED',
'group': 'vm_ct',
'default_enabled': True,
},
'replication_complete': {
'title': '{hostname}: Replication complete - {vmid}',
'body': 'Replication of {vmname} ({vmid}) completed successfully.',
'label': 'Replication complete',
'group': 'vm_ct',
'default_enabled': False,
},
# ── Backup / Snapshot events ──
'backup_start': {
'title': '{hostname}: Backup started [{storage}]',
'body': '{reason}',
'label': 'Backup started',
'group': 'backup',
'default_enabled': False,
},
'backup_complete': {
'title': '{hostname}: Backup complete - {vmid}',
'body': 'Backup of {vmname} ({vmid}) completed successfully.\nSize: {size}',
'label': 'Backup complete',
'group': 'backup',
'default_enabled': True,
},
'backup_fail': {
'title': '{hostname}: Backup FAILED - {vmid}',
'body': 'Backup of {vmname} ({vmid}) has failed.\n{reason}',
'label': 'Backup FAILED',
'group': 'backup',
'default_enabled': True,
},
'snapshot_complete': {
'title': '{hostname}: Snapshot created - {vmid}',
'body': 'Snapshot of {vmname} ({vmid}) created: {snapshot_name}',
'label': 'Snapshot created',
'group': 'backup',
'default_enabled': False,
},
'snapshot_fail': {
'title': '{hostname}: Snapshot FAILED - {vmid}',
'body': 'Snapshot of {vmname} ({vmid}) failed.\n{reason}',
'label': 'Snapshot FAILED',
'group': 'backup',
'default_enabled': True,
},
# ── Resource events (from Health Monitor) ──
'cpu_high': {
'title': '{hostname}: High CPU usage ({value}%)',
'body': 'CPU usage is at {value}% on {cores} cores.\n{details}',
'label': 'High CPU usage',
'group': 'resources',
'default_enabled': True,
},
'ram_high': {
'title': '{hostname}: High memory usage ({value}%)',
'body': 'Memory usage: {used} / {total} ({value}%).\n{details}',
'label': 'High memory usage',
'group': 'resources',
'default_enabled': True,
},
'temp_high': {
'title': '{hostname}: High temperature ({value}C)',
'body': 'CPU temperature: {value}C (threshold: {threshold}C).\n{details}',
'label': 'High temperature',
'group': 'resources',
'default_enabled': True,
},
'disk_space_low': {
'title': '{hostname}: Low disk space on {mount}',
'body': '{mount}: {used}% used ({available} available).',
'label': 'Low disk space',
'group': 'storage',
'default_enabled': True,
},
'disk_io_error': {
'title': '{hostname}: Disk failure detected on {device}',
'body': '{reason}',
'label': 'Disk failure / I/O error',
'group': 'storage',
'default_enabled': True,
},
'storage_unavailable': {
'title': '{hostname}: Storage unavailable - {storage_name}',
'body': 'PVE storage "{storage_name}" ({storage_type}) is not available.\n{reason}',
'label': 'Storage unavailable',
'group': 'storage',
'default_enabled': True,
},
'load_high': {
'title': '{hostname}: High system load ({value})',
'body': 'System load average: {value} on {cores} cores.\n{details}',
'label': 'High system load',
'group': 'resources',
'default_enabled': True,
},
# ── Network events ──
'network_down': {
'title': '{hostname}: Network connectivity lost',
'body': 'Network connectivity check failed.\n{reason}',
'label': 'Network connectivity lost',
'group': 'network',
'default_enabled': True,
},
'network_latency': {
'title': '{hostname}: High network latency ({value}ms)',
'body': 'Latency to gateway: {value}ms (threshold: {threshold}ms).',
'label': 'High network latency',
'group': 'network',
'default_enabled': False,
},
# ── Security events ──
'auth_fail': {
'title': '{hostname}: Authentication failure',
'body': 'Failed login attempt from {source_ip}.\nUser: {username}\nService: {service}',
'label': 'Authentication failure',
'group': 'security',
'default_enabled': True,
},
'ip_block': {
'title': '{hostname}: IP blocked by Fail2Ban',
'body': 'IP {source_ip} has been banned.\nJail: {jail}\nFailures: {failures}',
'label': 'IP blocked by Fail2Ban',
'group': 'security',
'default_enabled': True,
},
'firewall_issue': {
'title': '{hostname}: Firewall issue detected',
'body': '{reason}',
'label': 'Firewall issue detected',
'group': 'security',
'default_enabled': True,
},
'user_permission_change': {
'title': '{hostname}: User permission changed',
'body': 'User: {username}\nChange: {change_details}',
'label': 'User permission changed',
'group': 'security',
'default_enabled': True,
},
# ── Cluster events ──
'split_brain': {
'title': '{hostname}: SPLIT-BRAIN detected',
'body': 'Cluster split-brain condition detected.\nQuorum status: {quorum}',
'label': 'SPLIT-BRAIN detected',
'group': 'cluster',
'default_enabled': True,
},
'node_disconnect': {
'title': '{hostname}: Node disconnected',
'body': 'Node {node_name} has disconnected from the cluster.',
'label': 'Node disconnected',
'group': 'cluster',
'default_enabled': True,
},
'node_reconnect': {
'title': '{hostname}: Node reconnected',
'body': 'Node {node_name} has reconnected to the cluster.',
'label': 'Node reconnected',
'group': 'cluster',
'default_enabled': True,
},
# ── Services events ──
'system_shutdown': {
'title': '{hostname}: System shutting down',
'body': '{reason}',
'label': 'System shutting down',
'group': 'services',
'default_enabled': True,
},
'system_reboot': {
'title': '{hostname}: System rebooting',
'body': '{reason}',
'label': 'System rebooting',
'group': 'services',
'default_enabled': True,
},
'system_problem': {
'title': '{hostname}: System problem detected',
'body': '{reason}',
'label': 'System problem detected',
'group': 'services',
'default_enabled': True,
},
'service_fail': {
'title': '{hostname}: Service failed - {service_name}',
'body': '{reason}',
'label': 'Service failed',
'group': 'services',
'default_enabled': True,
},
'oom_kill': {
'title': '{hostname}: OOM Kill - {process}',
'body': '{reason}',
'label': 'Out of memory kill',
'group': 'services',
'default_enabled': True,
},
# ── Hidden internal templates (not shown in UI) ──
'service_fail_batch': {
'title': '{hostname}: {service_count} services failed',
'body': '{reason}',
'label': 'Service fail batch',
'group': 'services',
'default_enabled': True,
'hidden': True,
},
'system_mail': {
'title': '{hostname}: {pve_title}',
'body': '{reason}',
'label': 'PVE system mail',
'group': 'other',
'default_enabled': True,
'hidden': True,
},
'webhook_test': {
'title': '{hostname}: Webhook test received',
'body': 'PVE webhook connectivity test successful.\n{reason}',
'label': 'Webhook test',
'group': 'other',
'default_enabled': True,
'hidden': True,
},
'update_available': {
'title': '{hostname}: Updates available',
'body': 'Total updates: {total_count}\nSecurity: {security_count}\nProxmox: {pve_count}\nKernel: {kernel_count}\nImportant packages:\n{important_list}',
'label': 'Updates available (legacy)',
'group': 'updates',
'default_enabled': False,
'hidden': True,
},
'unknown_persistent': {
'title': '{hostname}: Check unavailable - {category}',
'body': 'Health check for {category} has been unavailable for 3+ cycles.\n{reason}',
'label': 'Check unavailable',
'group': 'health',
'default_enabled': False,
'hidden': True,
},
# ── Health Monitor events ──
'health_persistent': {
'title': '{hostname}: {count} active health issue(s)',
'body': 'The following health issues remain active:\n{issue_list}\n\nThis digest is sent once every 24 hours while issues persist.',
'label': 'Active health issues (daily)',
'group': 'health',
'default_enabled': True,
},
'health_issue_new': {
'title': '{hostname}: New health issue - {category}',
'body': 'New {severity} issue detected:\n{reason}',
'label': 'New health issue',
'group': 'health',
'default_enabled': True,
},
'health_issue_resolved': {
'title': '{hostname}: Resolved - {category}',
'body': '{category} issue has been resolved.\n{reason}\nDuration: {duration}',
'label': 'Health issue resolved',
'group': 'health',
'default_enabled': True,
'hidden': True, # Use error_resolved instead (avoids duplicate in UI)
},
# ── Update notifications ──
'update_summary': {
'title': '{hostname}: Updates available',
'body': (
'Total updates: {total_count}\n'
'Security updates: {security_count}\n'
'Proxmox-related updates: {pve_count}\n'
'Kernel updates: {kernel_count}\n'
'Important packages:\n{important_list}'
),
'label': 'Updates available',
'group': 'updates',
'default_enabled': True,
},
'pve_update': {
'title': '{hostname}: Proxmox VE {new_version} available',
'body': 'Proxmox VE {current_version} -> {new_version}\n{details}',
'label': 'Proxmox VE update available',
'group': 'updates',
'default_enabled': True,
},
'update_complete': {
'title': '{hostname}: Update completed',
'body': '{details}',
'label': 'Update completed',
'group': 'updates',
'default_enabled': False,
},
# ── Burst aggregation summaries (hidden -- auto-generated by BurstAggregator) ──
# These inherit enabled state from their parent event type at dispatch time.
'burst_auth_fail': {
'title': '{hostname}: {count} auth failures in {window}',
'body': '{count} authentication failures detected in {window}.\nSources: {entity_list}',
'label': 'Auth failures burst',
'group': 'security',
'default_enabled': True,
'hidden': True,
},
'burst_ip_block': {
'title': '{hostname}: Fail2Ban banned {count} IPs in {window}',
'body': '{count} IPs banned by Fail2Ban in {window}.\nIPs: {entity_list}',
'label': 'IP block burst',
'group': 'security',
'default_enabled': True,
'hidden': True,
},
'burst_disk_io': {
'title': '{hostname}: {count} disk I/O errors on {entity_list}',
'body': '{count} I/O errors detected in {window}.\nDevices: {entity_list}',
'label': 'Disk I/O burst',
'group': 'storage',
'default_enabled': True,
'hidden': True,
},
'burst_cluster': {
'title': '{hostname}: Cluster flapping detected ({count} changes)',
'body': 'Cluster state changed {count} times in {window}.\nNodes: {entity_list}',
'label': 'Cluster flapping burst',
'group': 'cluster',
'default_enabled': True,
'hidden': True,
},
'burst_service_fail': {
'title': '{hostname}: {count} services failed in {window}',
'body': '{count} service failures detected in {window}.\nThis typically indicates a node reboot or PVE service restart.\n\nAdditional failures:\n{details}',
'label': 'Service fail burst',
'group': 'services',
'default_enabled': True,
'hidden': True,
},
'burst_system': {
'title': '{hostname}: {count} system problems in {window}',
'body': '{count} system problems detected in {window}.\n\nAdditional issues:\n{details}',
'label': 'System problems burst',
'group': 'services',
'default_enabled': True,
'hidden': True,
},
'burst_generic': {
'title': '{hostname}: {count} {event_type} events in {window}',
'body': '{count} events of type {event_type} in {window}.\n\nAdditional events:\n{details}',
'label': 'Generic burst',
'group': 'other',
'default_enabled': True,
'hidden': True,
},
}
# ─── Event Groups (for UI filtering) ─────────────────────────────
EVENT_GROUPS = {
'vm_ct': {'label': 'VM / CT', 'description': 'Start, stop, crash, migration'},
'backup': {'label': 'Backups', 'description': 'Backup start, complete, fail'},
'resources': {'label': 'Resources', 'description': 'CPU, memory, temperature'},
'storage': {'label': 'Storage', 'description': 'Disk space, I/O, SMART'},
'network': {'label': 'Network', 'description': 'Connectivity, bond, latency'},
'security': {'label': 'Security', 'description': 'Auth failures, Fail2Ban, firewall'},
'cluster': {'label': 'Cluster', 'description': 'Quorum, split-brain, HA fencing'},
'services': {'label': 'Services', 'description': 'System services, shutdown, reboot'},
'health': {'label': 'Health Monitor', 'description': 'Health checks, degradation, recovery'},
'updates': {'label': 'Updates', 'description': 'System and PVE updates'},
'other': {'label': 'Other', 'description': 'Uncategorized notifications'},
}
# ─── Template Renderer ───────────────────────────────────────────
def _get_hostname() -> str:
"""Get short hostname for message titles."""
try:
return socket.gethostname().split('.')[0]
except Exception:
return 'proxmox'
def render_template(event_type: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""Render a template into a structured notification object.
Returns structured output usable by all channels:
title, body (text), body_text, body_html (escaped), fields, tags, severity, group
"""
import html as html_mod
template = TEMPLATES.get(event_type)
if not template:
# Catch-all: unknown event types always get delivered (group 'other')
# so no Proxmox notification is ever silently dropped.
fallback_body = data.get('message', data.get('reason', str(data)))
severity = data.get('severity', 'INFO')
return {
'title': f"{_get_hostname()}: {event_type}",
'body': fallback_body, 'body_text': fallback_body,
'body_html': f'<p>{html_mod.escape(str(fallback_body))}</p>',
'fields': [], 'tags': [severity, 'other', event_type],
'severity': severity, 'group': 'other',
}
# Ensure hostname is always available
variables = {
'hostname': _get_hostname(),
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
'severity': data.get('severity', 'INFO'),
# Burst event variables
'window': '', 'entity_list': '',
# Common defaults
'vmid': '', 'vmname': '', 'reason': '', 'summary': '',
'details': '', 'category': '', 'previous': '', 'current': '',
'duration': '', 'value': '', 'threshold': '',
'source_ip': '', 'username': '', 'service': '', 'service_name': '',
'node_name': '', 'target_node': '', 'mount': '', 'device': '',
'used': '', 'total': '', 'available': '', 'cores': '',
'count': '', 'size': '', 'snapshot_name': '', 'jail': '',
'failures': '', 'quorum': '', 'change_details': '', 'message': '',
'security_count': '0', 'total_count': '0', 'package_list': '',
'packages': '', 'pve_packages': '', 'version': '',
'issue_list': '', 'error_key': '',
'storage_name': '', 'storage_type': '',
}
variables.update(data)
try:
title = template['title'].format(**variables)
except (KeyError, ValueError):
title = template['title']
# ── PVE vzdump special formatting ──
# When the event came from PVE webhook with a full vzdump message,
# parse the table/logs and format a rich body instead of the sparse template.
pve_message = data.get('pve_message', '')
pve_title = data.get('pve_title', '')
if event_type in ('backup_complete', 'backup_fail') and pve_message:
parsed = _parse_vzdump_message(pve_message)
if parsed:
is_success = (event_type == 'backup_complete')
body_text = _format_vzdump_body(parsed, is_success)
# Use PVE's own title if available (contains hostname and status)
if pve_title:
title = pve_title
else:
# Couldn't parse -- use PVE raw message as body
body_text = pve_message.strip()
elif event_type == 'system_mail' and pve_message:
# System mail -- use PVE message directly (mail bounce, cron, smartd)
body_text = pve_message.strip()[:1000]
else:
try:
body_text = template['body'].format(**variables)
except (KeyError, ValueError):
body_text = template['body']
# Clean up: collapse runs of 3+ blank lines into 1, remove trailing whitespace
import re as _re
body_text = _re.sub(r'\n{3,}', '\n\n', body_text.strip())
severity = variables.get('severity', 'INFO')
group = template.get('group', 'system')
# Build structured fields for Discord embeds / rich notifications
fields = []
field_map = [
('vmid', 'VM/CT'), ('vmname', 'Name'), ('device', 'Device'),
('source_ip', 'Source IP'), ('node_name', 'Node'), ('category', 'Category'),
('service_name', 'Service'), ('jail', 'Jail'), ('username', 'User'),
('count', 'Count'), ('window', 'Window'), ('entity_list', 'Affected'),
]
for key, label in field_map:
val = variables.get(key, '')
if val:
fields.append((label, str(val)))
# Build HTML body with escaped content
body_html_parts = []
for line in body_text.split('\n'):
if line.strip():
body_html_parts.append(f'<p>{html_mod.escape(line)}</p>')
body_html = '\n'.join(body_html_parts) if body_html_parts else f'<p>{html_mod.escape(body_text)}</p>'
return {
'title': title,
'body': body_text, # backward compat
'body_text': body_text,
'body_html': body_html,
'fields': fields,
'tags': [severity, group, event_type],
'severity': severity,
'group': group,
}
def get_event_types_by_group() -> Dict[str, list]:
"""Get all event types organized by group, for UI rendering.
Hidden templates (burst aggregations, internal types) are excluded
from the UI. They still work in the backend and inherit enabled
state from their parent event type.
Returns:
{group_key: [{'type': event_type, 'title': label,
'default_enabled': bool}, ...]}
"""
result = {}
for event_type, template in TEMPLATES.items():
# Skip hidden templates (bursts, internal, deprecated)
if template.get('hidden', False):
continue
group = template.get('group', 'other')
if group not in result:
result[group] = []
# Use explicit label if available, otherwise derive from title
label = template.get('label', '')
if not label:
import re
label = template['title'].replace('{hostname}', '').strip(': ')
label = re.sub(r'\s*\{[^}]+\}', '', label).strip(' -:')
if not label:
label = event_type.replace('_', ' ').title()
result[group].append({
'type': event_type,
'title': label,
'default_enabled': template.get('default_enabled', True),
})
return result
def get_default_enabled_events() -> Dict[str, bool]:
"""Get the default enabled state for all event types."""
return {
event_type: template.get('default_enabled', True)
for event_type, template in TEMPLATES.items()
}
# ─── Emoji Enrichment (per-channel opt-in) ──────────────────────
# Category-level header icons
CATEGORY_EMOJI = {
'vm_ct': '\U0001F5A5\uFE0F', # desktop computer
'backup': '\U0001F4BE', # floppy disk (backup)
'resources': '\U0001F4CA', # bar chart
'storage': '\U0001F4BD', # minidisc / hard disk
'network': '\U0001F310', # globe with meridians
'security': '\U0001F6E1\uFE0F', # shield
'cluster': '\U0001F517', # chain link
'services': '\u2699\uFE0F', # gear
'health': '\U0001FA7A', # stethoscope
'updates': '\U0001F504', # counterclockwise arrows (update)
'other': '\U0001F4E8', # incoming envelope
}
# Event-specific title icons (override category default when present)
EVENT_EMOJI = {
# VM / CT
'vm_start': '\u25B6\uFE0F', # play button
'vm_stop': '\u23F9\uFE0F', # stop button
'vm_shutdown': '\u23CF\uFE0F', # eject
'vm_fail': '\U0001F4A5', # collision (crash)
'vm_restart': '\U0001F504', # cycle
'ct_start': '\u25B6\uFE0F',
'ct_stop': '\u23F9\uFE0F',
'ct_shutdown': '\u23CF\uFE0F',
'ct_restart': '\U0001F504',
'ct_fail': '\U0001F4A5',
'migration_start': '\U0001F69A', # moving truck
'migration_complete': '\u2705', # check mark
'migration_fail': '\u274C', # cross mark
'replication_fail': '\u274C',
'replication_complete': '\u2705',
# Backups
'backup_start': '\U0001F4E6', # package
'backup_complete': '\u2705',
'backup_fail': '\u274C',
'snapshot_complete': '\U0001F4F8', # camera with flash
'snapshot_fail': '\u274C',
# Resources
'cpu_high': '\U0001F525', # fire
'ram_high': '\U0001F4A7', # droplet
'temp_high': '\U0001F321\uFE0F', # thermometer
'load_high': '\u26A0\uFE0F', # warning
# Storage
'disk_space_low': '\U0001F4C9', # chart decreasing
'disk_io_error': '\U0001F4A5',
'storage_unavailable': '\U0001F6AB', # prohibited
# Network
'network_down': '\U0001F50C', # electric plug
'network_latency': '\U0001F422', # turtle (slow)
# Security
'auth_fail': '\U0001F6A8', # police light
'ip_block': '\U0001F6B7', # no pedestrians (banned)
'firewall_issue': '\U0001F525',
'user_permission_change': '\U0001F511', # key
# Cluster
'split_brain': '\U0001F4A2', # anger symbol
'node_disconnect': '\U0001F50C',
'node_reconnect': '\u2705',
# Services
'system_shutdown': '\u23FB\uFE0F', # power symbol (Unicode)
'system_reboot': '\U0001F504',
'system_problem': '\u26A0\uFE0F',
'service_fail': '\u274C',
'oom_kill': '\U0001F4A3', # bomb
# Health
'new_error': '\U0001F198', # SOS
'error_resolved': '\u2705',
'error_escalated': '\U0001F53A', # red triangle up
'health_degraded': '\u26A0\uFE0F',
'health_persistent': '\U0001F4CB', # clipboard
# Updates
'update_summary': '\U0001F4E6',
'pve_update': '\U0001F195', # NEW
'update_complete': '\u2705',
}
# Decorative field-level icons for body text enrichment
FIELD_EMOJI = {
'hostname': '\U0001F4BB', # laptop
'vmid': '\U0001F194', # ID button
'vmname': '\U0001F3F7\uFE0F', # label
'device': '\U0001F4BD', # disk
'mount': '\U0001F4C2', # open folder
'source_ip': '\U0001F310', # globe
'username': '\U0001F464', # bust in silhouette
'service_name': '\u2699\uFE0F', # gear
'node_name': '\U0001F5A5\uFE0F', # computer
'target_node': '\U0001F3AF', # direct hit (target)
'category': '\U0001F4CC', # pushpin
'severity': '\U0001F6A6', # traffic light
'duration': '\u23F1\uFE0F', # stopwatch
'timestamp': '\U0001F552', # clock three
'size': '\U0001F4CF', # ruler
'reason': '\U0001F4DD', # memo
'value': '\U0001F4CA', # chart
'threshold': '\U0001F6A7', # construction
'jail': '\U0001F512', # lock
'failures': '\U0001F522', # input numbers
'quorum': '\U0001F465', # busts in silhouette
'total_count': '\U0001F4E6', # package
'security_count': '\U0001F6E1\uFE0F', # shield
'pve_count': '\U0001F4E6',
'kernel_count': '\u2699\uFE0F',
'important_list': '\U0001F4CB', # clipboard
}
def enrich_with_emojis(event_type: str, title: str, body: str,
data: Dict[str, Any]) -> tuple:
"""Replace the plain title/body with emoji-enriched versions.
Returns (enriched_title, enriched_body).
The function is idempotent: if the title already starts with an emoji,
it is returned unchanged.
"""
# Pick the best title icon: event-specific > category > severity circle
template = TEMPLATES.get(event_type, {})
group = template.get('group', 'other')
severity = data.get('severity', 'INFO')
icon = EVENT_EMOJI.get(event_type) or CATEGORY_EMOJI.get(group) or SEVERITY_ICONS.get(severity, '')
# Build enriched title: replace severity circle with event-specific icon
# Current format: "hostname: Something" -> "ICON hostname: Something"
# If title already starts with an emoji (from a previous pass), skip.
enriched_title = title
if icon and not any(title.startswith(e) for e in SEVERITY_ICONS.values()):
enriched_title = f'{icon} {title}'
elif icon:
# Replace existing severity circle with richer icon
for sev_icon in SEVERITY_ICONS.values():
if title.startswith(sev_icon):
enriched_title = title.replace(sev_icon, icon, 1)
break
# Build enriched body: prepend field emojis to recognizable lines
lines = body.split('\n')
enriched_lines = []
for line in lines:
stripped = line.strip()
if not stripped:
enriched_lines.append(line)
continue
# Try to match "FieldName: value" patterns
enriched = False
for field_key, field_icon in FIELD_EMOJI.items():
# Match common label patterns: "Device:", "Duration:", "Size:", etc.
label_variants = [
field_key.replace('_', ' ').title(), # "Source Ip" -> not great
field_key.replace('_', ' '), # "source ip"
]
# Also add specific known labels
_LABEL_MAP = {
'vmid': 'VM/CT', 'vmname': 'Name', 'source_ip': 'Source IP',
'service_name': 'Service', 'node_name': 'Node',
'target_node': 'Target', 'total_count': 'Total updates',
'security_count': 'Security updates', 'pve_count': 'Proxmox-related updates',
'kernel_count': 'Kernel updates', 'important_list': 'Important packages',
'duration': 'Duration', 'severity': 'Previous severity',
'original_severity': 'Previous severity',
}
if field_key in _LABEL_MAP:
label_variants.append(_LABEL_MAP[field_key])
for label in label_variants:
if stripped.lower().startswith(label.lower() + ':'):
enriched_lines.append(f'{field_icon} {stripped}')
enriched = True
break
elif stripped.lower().startswith(label.lower() + ' '):
enriched_lines.append(f'{field_icon} {stripped}')
enriched = True
break
if enriched:
break
if not enriched:
enriched_lines.append(line)
enriched_body = '\n'.join(enriched_lines)
return enriched_title, enriched_body
# ─── AI Enhancement (Optional) ───────────────────────────────────
# Supported languages for AI translation
AI_LANGUAGES = {
'en': 'English',
'es': 'Spanish',
'fr': 'French',
'de': 'German',
'pt': 'Portuguese',
'it': 'Italian',
'ru': 'Russian',
'sv': 'Swedish',
'no': 'Norwegian',
'ja': 'Japanese',
'zh': 'Chinese',
'nl': 'Dutch',
}
# Token limits for different detail levels
AI_DETAIL_TOKENS = {
'brief': 100, # 2-3 lines, essential only
'standard': 200, # Concise paragraph with context
'detailed': 400, # Complete technical details
}
# System prompt template - informative, no recommendations
AI_SYSTEM_PROMPT = """You are a technical assistant for ProxMenux Monitor, a Proxmox server monitoring system.
Your task is to translate and format system alerts to {language}.
STRICT RULES:
1. Translate the message to the requested language
2. Maintain an INFORMATIVE and OBJECTIVE tone
3. DO NOT use formal introductions ("Dear...", "Esteemed...")
4. DO NOT give recommendations or action suggestions
5. DO NOT interpret data subjectively
6. Present only FACTS and TECHNICAL DATA
7. Respect the requested detail level: {detail_level}
{emoji_instructions}
DETAIL LEVELS:
- brief: 2-3 lines maximum, only essential information
- standard: Concise paragraph with basic context
- detailed: Complete information with all available technical details
MESSAGE TYPES:
- Some messages come from Proxmox VE webhooks with raw system data (backup logs, update lists, SMART errors)
- Parse and present this data clearly, extracting key information (VM IDs, sizes, durations, errors)
- For backup messages: highlight status (OK/ERROR), VM names, sizes, and duration
- For update messages: list package names and counts
- For disk/SMART errors: highlight affected device and error type
If journal log context is provided, use it for more precise event information."""
# Emoji instructions for rich format channels
AI_EMOJI_INSTRUCTIONS = """
8. ENRICH with contextual emojis and icons:
- Use appropriate emojis at the START of the title/message to indicate severity and type
- Severity indicators: Use a colored circle at the start (info=blue, warning=yellow, critical=red)
- Add relevant technical emojis: disk, server, network, security, backup, etc.
- Keep emojis contextual and professional, not decorative
- Examples of appropriate emojis:
* Disk/Storage: disk, folder, file
* Network: globe, signal, connection
* Security: shield, lock, key, warning
* System: gear, server, computer
* Status: checkmark, cross, warning, info
* Backup: save, sync, cloud
* Performance: chart, speedometer"""
# No emoji instructions for email/plain channels
AI_NO_EMOJI_INSTRUCTIONS = """
8. DO NOT use emojis or special icons - plain text only for email compatibility"""
class AIEnhancer:
"""AI message enhancement using pluggable providers.
Supports 6 providers: Groq, OpenAI, Anthropic, Gemini, Ollama, OpenRouter.
Translates and formats notifications based on configured language and detail level.
"""
def __init__(self, config: Dict[str, Any]):
"""Initialize AIEnhancer with configuration.
Args:
config: Dictionary containing:
- ai_provider: Provider name (groq, openai, anthropic, gemini, ollama, openrouter)
- ai_api_key: API key (not required for ollama)
- ai_model: Optional model override
- ai_language: Target language code (en, es, fr, etc.)
- ai_ollama_url: URL for Ollama server (optional)
"""
self.config = config
self._provider = None
self._init_provider()
def _init_provider(self):
"""Initialize the AI provider based on configuration."""
try:
# Import here to avoid circular imports
import sys
import os
# Add script directory to path for ai_providers import
script_dir = os.path.dirname(os.path.abspath(__file__))
if script_dir not in sys.path:
sys.path.insert(0, script_dir)
from ai_providers import get_provider
provider_name = self.config.get('ai_provider', 'groq')
self._provider = get_provider(
provider_name,
api_key=self.config.get('ai_api_key', ''),
model=self.config.get('ai_model', ''),
base_url=self.config.get('ai_ollama_url', ''),
)
except Exception as e:
print(f"[AIEnhancer] Failed to initialize provider: {e}")
self._provider = None
@property
def enabled(self) -> bool:
"""Check if AI enhancement is available."""
return self._provider is not None
def enhance(self, title: str, body: str, severity: str,
detail_level: str = 'standard',
journal_context: str = '',
use_emojis: bool = False) -> Optional[str]:
"""Enhance/translate notification with AI.
Args:
title: Notification title
body: Notification body text
severity: Severity level (info, warning, critical)
detail_level: Level of detail (brief, standard, detailed)
journal_context: Optional journal log lines for context
use_emojis: Whether to include emojis in the response (for push channels)
Returns:
Enhanced/translated text or None if failed
"""
if not self._provider:
return None
# Get language settings
language_code = self.config.get('ai_language', 'en')
language_name = AI_LANGUAGES.get(language_code, 'English')
# Get token limit for detail level
max_tokens = AI_DETAIL_TOKENS.get(detail_level, 200)
# Select emoji instructions based on channel type
emoji_instructions = AI_EMOJI_INSTRUCTIONS if use_emojis else AI_NO_EMOJI_INSTRUCTIONS
# Build system prompt with emoji instructions
system_prompt = AI_SYSTEM_PROMPT.format(
language=language_name,
detail_level=detail_level,
emoji_instructions=emoji_instructions
)
# Build user message
user_msg = f"Severity: {severity}\nTitle: {title}\nMessage:\n{body}"
if journal_context:
user_msg += f"\n\nJournal log context:\n{journal_context}"
try:
result = self._provider.generate(system_prompt, user_msg, max_tokens)
return result
except Exception as e:
print(f"[AIEnhancer] Enhancement failed: {e}")
return None
def test_connection(self) -> Dict[str, Any]:
"""Test the AI provider connection.
Returns:
Dict with success, message, and model info
"""
if not self._provider:
return {
'success': False,
'message': 'Provider not initialized',
'model': ''
}
return self._provider.test_connection()
def format_with_ai(title: str, body: str, severity: str,
ai_config: Dict[str, Any],
detail_level: str = 'standard',
journal_context: str = '',
use_emojis: bool = False) -> str:
"""Format a message with AI enhancement/translation.
Replaces the message body with AI-processed version if successful.
Falls back to original body if AI is unavailable or fails.
Args:
title: Notification title
body: Notification body
severity: Severity level
ai_config: Configuration dictionary with AI settings
detail_level: Level of detail (brief, standard, detailed)
journal_context: Optional journal log context
use_emojis: Whether to include emojis (for push channels like Telegram/Discord)
Returns:
Enhanced body string or original if AI fails
"""
# Check if AI is enabled
ai_enabled = ai_config.get('ai_enabled')
if isinstance(ai_enabled, str):
ai_enabled = ai_enabled.lower() == 'true'
if not ai_enabled:
return body
# Check for API key (not required for Ollama)
provider = ai_config.get('ai_provider', 'groq')
if provider != 'ollama' and not ai_config.get('ai_api_key'):
return body
# For Ollama, check URL is configured
if provider == 'ollama' and not ai_config.get('ai_ollama_url'):
return body
# Create enhancer and process
enhancer = AIEnhancer(ai_config)
enhanced = enhancer.enhance(
title, body, severity,
detail_level=detail_level,
journal_context=journal_context,
use_emojis=use_emojis
)
# Return enhanced text if successful, otherwise original
if enhanced:
# For detailed level (email), append original message for reference
# This ensures full technical data is available even after AI processing
if detail_level == 'detailed' and body and len(body) > 50:
# Only append if original has substantial content
enhanced += "\n\n" + "-" * 40 + "\n"
enhanced += "Original message:\n"
enhanced += body
return enhanced
return body