Update notification service

This commit is contained in:
MacRimi
2026-02-27 23:45:18 +01:00
parent 026719cd88
commit 3c64ee7af2
13 changed files with 151 additions and 6927 deletions

View File

@@ -3,7 +3,6 @@
import type React from "react"
import { useState, useEffect, useCallback } from "react"
import { fetchApi, getApiUrl, getAuthToken } from "@/lib/api-config"
import { Dialog, DialogContent, DialogDescription, DialogHeader, DialogTitle } from "@/components/ui/dialog"
import { Badge } from "@/components/ui/badge"
import { Button } from "@/components/ui/button"
@@ -123,16 +122,10 @@ export function HealthStatusModal({ open, onOpenChange, getApiUrl }: HealthStatu
let newOverallStatus = "OK"
// Use the new combined endpoint for fewer round-trips
const token = getAuthToken()
const authHeaders: Record<string, string> = {}
if (token) {
authHeaders["Authorization"] = `Bearer ${token}`
}
const response = await fetch(getApiUrl("/api/health/full"), { headers: authHeaders })
const response = await fetch(getApiUrl("/api/health/full"))
if (!response.ok) {
// Fallback to legacy endpoint
const legacyResponse = await fetch(getApiUrl("/api/health/details"), { headers: authHeaders })
const legacyResponse = await fetch(getApiUrl("/api/health/details"))
if (!legacyResponse.ok) throw new Error("Failed to fetch health details")
const data = await legacyResponse.json()
setHealthData(data)
@@ -295,22 +288,15 @@ export function HealthStatusModal({ open, onOpenChange, getApiUrl }: HealthStatu
setDismissingKey(errorKey)
try {
const url = getApiUrl("/api/health/acknowledge")
const token = getAuthToken()
const headers: Record<string, string> = { "Content-Type": "application/json" }
if (token) {
headers["Authorization"] = `Bearer ${token}`
}
const response = await fetch(url, {
const response = await fetch(getApiUrl("/api/health/acknowledge"), {
method: "POST",
headers,
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ error_key: errorKey }),
})
if (!response.ok) {
const errorData = await response.json().catch(() => ({}))
throw new Error(errorData.error || `Failed to dismiss error (${response.status})`)
const errorData = await response.json()
throw new Error(errorData.error || "Failed to dismiss error")
}
await fetchHealthDetails()
@@ -422,10 +408,10 @@ export function HealthStatusModal({ open, onOpenChange, getApiUrl }: HealthStatu
key={checkKey}
className="flex items-center justify-between gap-1.5 sm:gap-2 text-[10px] sm:text-xs py-1.5 px-2 sm:px-3 rounded-md hover:bg-muted/40 transition-colors"
>
<div className="flex items-start gap-1.5 sm:gap-2 min-w-0 flex-1">
<span className="mt-0.5 shrink-0">{getStatusIcon(checkData.status, "sm")}</span>
<div className="flex items-center gap-1.5 sm:gap-2 min-w-0 flex-1 overflow-hidden">
{getStatusIcon(checkData.status, "sm")}
<span className="font-medium shrink-0">{formatCheckLabel(checkKey)}</span>
<span className="text-muted-foreground break-words whitespace-pre-wrap min-w-0">{checkData.detail}</span>
<span className="text-muted-foreground truncate block">{checkData.detail}</span>
{checkData.dismissed && (
<Badge variant="outline" className="text-[9px] px-1 py-0 h-4 shrink-0 text-blue-400 border-blue-400/30">
Dismissed
@@ -534,8 +520,8 @@ export function HealthStatusModal({ open, onOpenChange, getApiUrl }: HealthStatu
</div>
{healthData.summary && healthData.summary !== "All systems operational" && (
<div className="text-xs sm:text-sm p-3 rounded-lg bg-muted/20 border overflow-hidden max-w-full">
<p className="font-medium text-foreground break-words whitespace-pre-wrap">{healthData.summary}</p>
<div className="text-sm p-3 rounded-lg bg-muted/20 border overflow-hidden max-w-full">
<p className="font-medium text-foreground truncate" title={healthData.summary}>{healthData.summary}</p>
</div>
)}
@@ -573,7 +559,7 @@ export function HealthStatusModal({ open, onOpenChange, getApiUrl }: HealthStatu
)}
</div>
{reason && !isExpanded && (
<p className="text-[10px] sm:text-xs text-muted-foreground mt-0.5 line-clamp-2 break-words">{reason}</p>
<p className="text-[10px] sm:text-xs text-muted-foreground mt-0.5 truncate" title={reason}>{reason}</p>
)}
</div>
<div className="flex items-center gap-1 sm:gap-2 shrink-0">
@@ -592,7 +578,7 @@ export function HealthStatusModal({ open, onOpenChange, getApiUrl }: HealthStatu
{isExpanded && (
<div className="border-t border-border/50 bg-muted/5 px-1.5 sm:px-2 py-1.5 overflow-hidden">
{reason && (
<p className="text-xs text-muted-foreground px-3 py-1.5 mb-1 break-words whitespace-pre-wrap">{reason}</p>
<p className="text-xs text-muted-foreground px-3 py-1.5 mb-1 break-words">{reason}</p>
)}
{hasChecks ? (
renderChecks(checks, key)

File diff suppressed because it is too large Load Diff

View File

@@ -3,7 +3,6 @@
import { useState, useEffect } from "react"
import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "./ui/card"
import { Wrench, Package, Ruler, HeartPulse, Cpu, MemoryStick, HardDrive, CircleDot, Network, Server, Settings2, FileText, RefreshCw, Shield, AlertTriangle, Info, Loader2, Check } from "lucide-react"
import { NotificationSettings } from "./notification-settings"
import { Select, SelectContent, SelectItem, SelectTrigger, SelectValue } from "./ui/select"
import { Input } from "./ui/input"
import { Badge } from "./ui/badge"
@@ -439,9 +438,6 @@ export function Settings() {
</CardContent>
</Card>
{/* Notification Settings */}
<NotificationSettings />
{/* ProxMenux Optimizations */}
<Card>
<CardHeader>

View File

@@ -34,12 +34,6 @@ interface DiskInfo {
wear_leveling_count?: number // SSD: Wear Leveling Count
total_lbas_written?: number // SSD/NVMe: Total LBAs Written (GB)
ssd_life_left?: number // SSD: SSD Life Left percentage
io_errors?: {
count: number
severity: string
sample: string
reason: string
}
}
interface ZFSPool {
@@ -782,17 +776,6 @@ export function StorageOverview() {
</div>
</div>
{disk.io_errors && disk.io_errors.count > 0 && (
<div className={`flex items-start gap-2 p-2 rounded text-xs ${
disk.io_errors.severity === 'CRITICAL'
? 'bg-red-500/10 text-red-400 border border-red-500/20'
: 'bg-yellow-500/10 text-yellow-400 border border-yellow-500/20'
}`}>
<AlertTriangle className="h-3.5 w-3.5 flex-shrink-0 mt-0.5" />
<span>{disk.io_errors.count} I/O error{disk.io_errors.count !== 1 ? 's' : ''} in 5 min</span>
</div>
)}
<div className="grid grid-cols-2 gap-4 text-sm">
{disk.size_formatted && (
<div>
@@ -858,22 +841,6 @@ export function StorageOverview() {
</div>
</div>
{disk.io_errors && disk.io_errors.count > 0 && (
<div className={`flex items-start gap-2 p-2 rounded text-xs ${
disk.io_errors.severity === 'CRITICAL'
? 'bg-red-500/10 text-red-400 border border-red-500/20'
: 'bg-yellow-500/10 text-yellow-400 border border-yellow-500/20'
}`}>
<AlertTriangle className="h-3.5 w-3.5 flex-shrink-0 mt-0.5" />
<div>
<span className="font-medium">{disk.io_errors.count} I/O error{disk.io_errors.count !== 1 ? 's' : ''} in 5 min</span>
{disk.io_errors.sample && (
<p className="mt-0.5 opacity-80 font-mono truncate max-w-md">{disk.io_errors.sample}</p>
)}
</div>
</div>
)}
<div className="grid grid-cols-2 md:grid-cols-4 gap-4 text-sm">
{disk.size_formatted && (
<div>

View File

@@ -91,11 +91,6 @@ cp "$SCRIPT_DIR/proxmox_storage_monitor.py" "$APP_DIR/usr/bin/" 2>/dev/null || e
cp "$SCRIPT_DIR/flask_script_runner.py" "$APP_DIR/usr/bin/" 2>/dev/null || echo "⚠️ flask_script_runner.py not found"
cp "$SCRIPT_DIR/security_manager.py" "$APP_DIR/usr/bin/" 2>/dev/null || echo "⚠️ security_manager.py not found"
cp "$SCRIPT_DIR/flask_security_routes.py" "$APP_DIR/usr/bin/" 2>/dev/null || echo "⚠️ flask_security_routes.py not found"
cp "$SCRIPT_DIR/notification_manager.py" "$APP_DIR/usr/bin/" 2>/dev/null || echo "⚠️ notification_manager.py not found"
cp "$SCRIPT_DIR/notification_channels.py" "$APP_DIR/usr/bin/" 2>/dev/null || echo "⚠️ notification_channels.py not found"
cp "$SCRIPT_DIR/notification_templates.py" "$APP_DIR/usr/bin/" 2>/dev/null || echo "⚠️ notification_templates.py not found"
cp "$SCRIPT_DIR/notification_events.py" "$APP_DIR/usr/bin/" 2>/dev/null || echo "⚠️ notification_events.py not found"
cp "$SCRIPT_DIR/flask_notification_routes.py" "$APP_DIR/usr/bin/" 2>/dev/null || echo "⚠️ flask_notification_routes.py not found"
echo "📋 Adding translation support..."
cat > "$APP_DIR/usr/bin/translate_cli.py" << 'PYEOF'

View File

@@ -1,695 +0,0 @@
"""
Flask routes for notification service configuration and management.
Blueprint pattern matching flask_health_routes.py / flask_security_routes.py.
"""
import hmac
import time
import hashlib
from collections import deque
from flask import Blueprint, jsonify, request
from notification_manager import notification_manager
# ─── Webhook Hardening Helpers ───────────────────────────────────
class WebhookRateLimiter:
"""Simple sliding-window rate limiter for the webhook endpoint."""
def __init__(self, max_requests: int = 60, window_seconds: int = 60):
self._max = max_requests
self._window = window_seconds
self._timestamps: deque = deque()
def allow(self) -> bool:
now = time.time()
# Prune entries outside the window
while self._timestamps and now - self._timestamps[0] > self._window:
self._timestamps.popleft()
if len(self._timestamps) >= self._max:
return False
self._timestamps.append(now)
return True
class ReplayCache:
"""Bounded in-memory cache of recently seen request signatures (60s TTL)."""
_MAX_SIZE = 2000 # Hard cap to prevent memory growth
def __init__(self, ttl: int = 60):
self._ttl = ttl
self._seen: dict = {} # signature -> timestamp
def check_and_record(self, signature: str) -> bool:
"""Return True if this signature was already seen (replay). Records it otherwise."""
now = time.time()
# Periodic cleanup
if len(self._seen) > self._MAX_SIZE // 2:
cutoff = now - self._ttl
self._seen = {k: v for k, v in self._seen.items() if v > cutoff}
if signature in self._seen and now - self._seen[signature] < self._ttl:
return True # Replay detected
self._seen[signature] = now
return False
# Module-level singletons (one per process)
_webhook_limiter = WebhookRateLimiter(max_requests=60, window_seconds=60)
_replay_cache = ReplayCache(ttl=60)
# Timestamp validation window (seconds)
_TIMESTAMP_MAX_DRIFT = 60
notification_bp = Blueprint('notifications', __name__)
@notification_bp.route('/api/notifications/settings', methods=['GET'])
def get_notification_settings():
"""Get all notification settings for the UI."""
try:
settings = notification_manager.get_settings()
return jsonify(settings)
except Exception as e:
return jsonify({'error': str(e)}), 500
@notification_bp.route('/api/notifications/settings', methods=['POST'])
def save_notification_settings():
"""Save notification settings from the UI."""
try:
payload = request.get_json()
if not payload:
return jsonify({'error': 'No data provided'}), 400
result = notification_manager.save_settings(payload)
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
@notification_bp.route('/api/notifications/test', methods=['POST'])
def test_notification():
"""Send a test notification to one or all channels."""
try:
data = request.get_json() or {}
channel = data.get('channel', 'all')
result = notification_manager.test_channel(channel)
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
@notification_bp.route('/api/notifications/status', methods=['GET'])
def get_notification_status():
"""Get notification service status."""
try:
status = notification_manager.get_status()
return jsonify(status)
except Exception as e:
return jsonify({'error': str(e)}), 500
@notification_bp.route('/api/notifications/history', methods=['GET'])
def get_notification_history():
"""Get notification history with optional filters."""
try:
limit = request.args.get('limit', 100, type=int)
offset = request.args.get('offset', 0, type=int)
severity = request.args.get('severity', '')
channel = request.args.get('channel', '')
result = notification_manager.get_history(limit, offset, severity, channel)
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
@notification_bp.route('/api/notifications/history', methods=['DELETE'])
def clear_notification_history():
"""Clear all notification history."""
try:
result = notification_manager.clear_history()
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
@notification_bp.route('/api/notifications/send', methods=['POST'])
def send_notification():
"""Send a notification via API (for testing or external triggers)."""
try:
data = request.get_json()
if not data:
return jsonify({'error': 'No data provided'}), 400
result = notification_manager.send_notification(
event_type=data.get('event_type', 'custom'),
severity=data.get('severity', 'INFO'),
title=data.get('title', ''),
message=data.get('message', ''),
data=data.get('data', {}),
source='api'
)
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
# ── PVE config constants ──
_PVE_ENDPOINT_ID = 'proxmenux-webhook'
_PVE_MATCHER_ID = 'proxmenux-default'
_PVE_WEBHOOK_URL = 'http://127.0.0.1:8008/api/notifications/webhook'
_PVE_NOTIFICATIONS_CFG = '/etc/pve/notifications.cfg'
_PVE_PRIV_CFG = '/etc/pve/priv/notifications.cfg'
_PVE_OUR_HEADERS = {
f'webhook: {_PVE_ENDPOINT_ID}',
f'matcher: {_PVE_MATCHER_ID}',
}
def _pve_read_file(path):
"""Read file, return (content, error). Content is '' if missing."""
try:
with open(path, 'r') as f:
return f.read(), None
except FileNotFoundError:
return '', None
except PermissionError:
return None, f'Permission denied reading {path}'
except Exception as e:
return None, str(e)
def _pve_backup_file(path):
"""Create timestamped backup if file exists. Never fails fatally."""
import os, shutil
from datetime import datetime
try:
if os.path.exists(path):
ts = datetime.now().strftime('%Y%m%d_%H%M%S')
backup = f"{path}.proxmenux_backup_{ts}"
shutil.copy2(path, backup)
except Exception:
pass
def _pve_remove_our_blocks(text, headers_to_remove):
"""Remove only blocks whose header line matches one of ours.
Preserves ALL other content byte-for-byte.
A block = header line + indented continuation lines + trailing blank line.
"""
lines = text.splitlines(keepends=True)
cleaned = []
skip_block = False
for line in lines:
stripped = line.strip()
if stripped and not line[0:1].isspace() and ':' in stripped:
if stripped in headers_to_remove:
skip_block = True
continue
else:
skip_block = False
if skip_block:
if not stripped:
skip_block = False
continue
elif line[0:1].isspace():
continue
else:
skip_block = False
cleaned.append(line)
return ''.join(cleaned)
def _build_webhook_fallback():
"""Build fallback manual commands for webhook setup."""
import base64
body_tpl = '{"title":"{{ escape title }}","message":"{{ escape message }}","severity":"{{ severity }}","timestamp":"{{ timestamp }}","fields":{{ json fields }}}'
body_b64 = base64.b64encode(body_tpl.encode()).decode()
return [
"# 1. Append to END of /etc/pve/notifications.cfg",
"# (do NOT delete existing content):",
"",
f"webhook: {_PVE_ENDPOINT_ID}",
f"\tbody {body_b64}",
f"\tmethod post",
f"\turl {_PVE_WEBHOOK_URL}",
"",
f"matcher: {_PVE_MATCHER_ID}",
f"\ttarget {_PVE_ENDPOINT_ID}",
"\tmode all",
"",
"# 2. Append to /etc/pve/priv/notifications.cfg :",
f"webhook: {_PVE_ENDPOINT_ID}",
]
def setup_pve_webhook_core() -> dict:
"""Core logic to configure PVE webhook. Callable from anywhere.
Returns dict with 'configured', 'error', 'fallback_commands' keys.
Idempotent: safe to call multiple times.
"""
import secrets as secrets_mod
result = {
'configured': False,
'endpoint_id': _PVE_ENDPOINT_ID,
'matcher_id': _PVE_MATCHER_ID,
'url': _PVE_WEBHOOK_URL,
'fallback_commands': [],
'error': None,
}
try:
# ── Step 1: Ensure webhook secret exists (for our own internal use) ──
secret = notification_manager.get_webhook_secret()
if not secret:
secret = secrets_mod.token_urlsafe(32)
notification_manager._save_setting('webhook_secret', secret)
# ── Step 2: Read main config ──
cfg_text, err = _pve_read_file(_PVE_NOTIFICATIONS_CFG)
if err:
result['error'] = err
result['fallback_commands'] = _build_webhook_fallback()
return result
# ── Step 3: Read priv config (to clean up any broken blocks we wrote before) ──
priv_text, err = _pve_read_file(_PVE_PRIV_CFG)
if err:
priv_text = None
# ── Step 4: Create backups before ANY modification ──
_pve_backup_file(_PVE_NOTIFICATIONS_CFG)
if priv_text is not None:
_pve_backup_file(_PVE_PRIV_CFG)
# ── Step 5: Remove any previous proxmenux blocks from BOTH files ──
cleaned_cfg = _pve_remove_our_blocks(cfg_text, _PVE_OUR_HEADERS)
if priv_text is not None:
cleaned_priv = _pve_remove_our_blocks(priv_text, _PVE_OUR_HEADERS)
# ── Step 6: Build new blocks ──
# Exact format from a real working PVE server:
# webhook: name
# \tmethod post
# \turl http://...
#
# NO header lines -- localhost webhook doesn't need them.
# PVE header format is: header name=X-Key,value=<base64>
# PVE secret format is: secret name=key,value=<base64>
# Neither is needed for localhost calls.
# PVE stores body as base64 in the config file.
# {{ escape title/message }} -- JSON-safe escaping of quotes/newlines.
# {{ json fields }} -- renders ALL PVE metadata as a JSON object
# (type, hostname, job-id). This is a single Handlebars helper
# that always works, even if fields is empty (renders {}).
import base64
body_template = '{"title":"{{ escape title }}","message":"{{ escape message }}","severity":"{{ severity }}","timestamp":"{{ timestamp }}","fields":{{ json fields }}}'
body_b64 = base64.b64encode(body_template.encode()).decode()
endpoint_block = (
f"webhook: {_PVE_ENDPOINT_ID}\n"
f"\tbody {body_b64}\n"
f"\tmethod post\n"
f"\turl {_PVE_WEBHOOK_URL}\n"
)
matcher_block = (
f"matcher: {_PVE_MATCHER_ID}\n"
f"\ttarget {_PVE_ENDPOINT_ID}\n"
f"\tmode all\n"
)
# ── Step 7: Append our blocks to cleaned main config ──
if cleaned_cfg and not cleaned_cfg.endswith('\n'):
cleaned_cfg += '\n'
if cleaned_cfg and not cleaned_cfg.endswith('\n\n'):
cleaned_cfg += '\n'
new_cfg = cleaned_cfg + endpoint_block + '\n' + matcher_block
# ── Step 8: Write main config ──
try:
with open(_PVE_NOTIFICATIONS_CFG, 'w') as f:
f.write(new_cfg)
except PermissionError:
result['error'] = f'Permission denied writing {_PVE_NOTIFICATIONS_CFG}'
result['fallback_commands'] = _build_webhook_fallback()
return result
except Exception as e:
try:
with open(_PVE_NOTIFICATIONS_CFG, 'w') as f:
f.write(cfg_text)
except Exception:
pass
result['error'] = str(e)
result['fallback_commands'] = _build_webhook_fallback()
return result
# ── Step 9: Write priv config with our webhook entry ──
# PVE REQUIRES a matching block in priv/notifications.cfg for every
# webhook endpoint, even if it has no secrets. Without it PVE throws:
# "Could not instantiate endpoint: private config does not exist"
priv_block = (
f"webhook: {_PVE_ENDPOINT_ID}\n"
)
if priv_text is not None:
# Start from cleaned priv (our old blocks removed)
if cleaned_priv and not cleaned_priv.endswith('\n'):
cleaned_priv += '\n'
if cleaned_priv and not cleaned_priv.endswith('\n\n'):
cleaned_priv += '\n'
new_priv = cleaned_priv + priv_block
else:
new_priv = priv_block
try:
with open(_PVE_PRIV_CFG, 'w') as f:
f.write(new_priv)
except PermissionError:
result['error'] = f'Permission denied writing {_PVE_PRIV_CFG}'
result['fallback_commands'] = _build_webhook_fallback()
return result
except Exception:
pass
result['configured'] = True
result['secret'] = secret
return result
except Exception as e:
result['error'] = str(e)
result['fallback_commands'] = _build_webhook_fallback()
return result
@notification_bp.route('/api/notifications/proxmox/setup-webhook', methods=['POST'])
def setup_proxmox_webhook():
"""HTTP endpoint wrapper for webhook setup."""
return jsonify(setup_pve_webhook_core()), 200
def cleanup_pve_webhook_core() -> dict:
"""Core logic to remove PVE webhook blocks. Callable from anywhere.
Returns dict with 'cleaned', 'error' keys.
Only removes blocks named 'proxmenux-webhook' / 'proxmenux-default'.
"""
result = {'cleaned': False, 'error': None}
try:
# Read both files
cfg_text, err = _pve_read_file(_PVE_NOTIFICATIONS_CFG)
if err:
result['error'] = err
return result
priv_text, err = _pve_read_file(_PVE_PRIV_CFG)
if err:
priv_text = None
# Check if our blocks actually exist before doing anything
has_our_blocks = any(
h in cfg_text for h in [f'webhook: {_PVE_ENDPOINT_ID}', f'matcher: {_PVE_MATCHER_ID}']
)
has_priv_blocks = priv_text and f'webhook: {_PVE_ENDPOINT_ID}' in priv_text
if not has_our_blocks and not has_priv_blocks:
result['cleaned'] = True
return result
# Backup before modification
_pve_backup_file(_PVE_NOTIFICATIONS_CFG)
if priv_text is not None:
_pve_backup_file(_PVE_PRIV_CFG)
# Remove our blocks
if has_our_blocks:
cleaned_cfg = _pve_remove_our_blocks(cfg_text, _PVE_OUR_HEADERS)
try:
with open(_PVE_NOTIFICATIONS_CFG, 'w') as f:
f.write(cleaned_cfg)
except PermissionError:
result['error'] = f'Permission denied writing {_PVE_NOTIFICATIONS_CFG}'
return result
except Exception as e:
# Rollback
try:
with open(_PVE_NOTIFICATIONS_CFG, 'w') as f:
f.write(cfg_text)
except Exception:
pass
result['error'] = str(e)
return result
if has_priv_blocks and priv_text is not None:
cleaned_priv = _pve_remove_our_blocks(priv_text, _PVE_OUR_HEADERS)
try:
with open(_PVE_PRIV_CFG, 'w') as f:
f.write(cleaned_priv)
except Exception:
pass # Best-effort
result['cleaned'] = True
return result
except Exception as e:
result['error'] = str(e)
return result
@notification_bp.route('/api/notifications/proxmox/cleanup-webhook', methods=['POST'])
def cleanup_proxmox_webhook():
"""HTTP endpoint wrapper for webhook cleanup."""
return jsonify(cleanup_pve_webhook_core()), 200
@notification_bp.route('/api/notifications/proxmox/read-cfg', methods=['GET'])
def read_pve_notification_cfg():
"""Diagnostic: return raw content of PVE notification config files.
GET /api/notifications/proxmox/read-cfg
Returns both notifications.cfg and priv/notifications.cfg content.
"""
import os
files = {
'notifications_cfg': '/etc/pve/notifications.cfg',
'priv_cfg': '/etc/pve/priv/notifications.cfg',
}
# Also look for any backups we created
backup_dir = '/etc/pve'
priv_backup_dir = '/etc/pve/priv'
result = {}
for key, path in files.items():
try:
with open(path, 'r') as f:
result[key] = {
'path': path,
'content': f.read(),
'size': os.path.getsize(path),
'error': None,
}
except FileNotFoundError:
result[key] = {'path': path, 'content': None, 'size': 0, 'error': 'file_not_found'}
except PermissionError:
result[key] = {'path': path, 'content': None, 'size': 0, 'error': 'permission_denied'}
except Exception as e:
result[key] = {'path': path, 'content': None, 'size': 0, 'error': str(e)}
# Find backups
backups = []
for d in [backup_dir, priv_backup_dir]:
try:
for fname in sorted(os.listdir(d)):
if 'proxmenux_backup' in fname:
fpath = os.path.join(d, fname)
try:
with open(fpath, 'r') as f:
backups.append({
'path': fpath,
'content': f.read(),
'size': os.path.getsize(fpath),
})
except Exception:
backups.append({'path': fpath, 'content': None, 'error': 'read_failed'})
except Exception:
pass
result['backups'] = backups
return jsonify(result), 200
@notification_bp.route('/api/notifications/proxmox/restore-cfg', methods=['POST'])
def restore_pve_notification_cfg():
"""Restore PVE notification config from our backup.
POST /api/notifications/proxmox/restore-cfg
Finds the most recent proxmenux_backup and restores it.
"""
import os
import shutil
files_to_restore = {
'/etc/pve': '/etc/pve/notifications.cfg',
'/etc/pve/priv': '/etc/pve/priv/notifications.cfg',
}
restored = []
errors = []
for search_dir, target_path in files_to_restore.items():
try:
candidates = sorted([
f for f in os.listdir(search_dir)
if 'proxmenux_backup' in f and f.startswith('notifications.cfg')
], reverse=True)
if candidates:
backup_path = os.path.join(search_dir, candidates[0])
shutil.copy2(backup_path, target_path)
restored.append({'target': target_path, 'from_backup': backup_path})
else:
errors.append({'target': target_path, 'error': 'no_backup_found'})
except Exception as e:
errors.append({'target': target_path, 'error': str(e)})
return jsonify({
'restored': restored,
'errors': errors,
'success': len(errors) == 0 and len(restored) > 0,
}), 200
@notification_bp.route('/api/notifications/webhook', methods=['POST'])
def proxmox_webhook():
"""Receive native Proxmox VE notification webhooks (hardened).
Security layers:
Localhost (127.0.0.1 / ::1): rate limiting only.
PVE calls us on localhost and cannot send custom auth headers,
so we trust the loopback interface (only local processes can reach it).
Remote: rate limiting + shared secret + timestamp + replay + IP allowlist.
"""
_reject = lambda code, error, status: (jsonify({'accepted': False, 'error': error}), status)
client_ip = request.remote_addr or ''
is_localhost = client_ip in ('127.0.0.1', '::1')
# ── Layer 1: Rate limiting (always) ──
if not _webhook_limiter.allow():
resp = jsonify({'accepted': False, 'error': 'rate_limited'})
resp.headers['Retry-After'] = '60'
return resp, 429
# ── Layers 2-5: Remote-only checks ──
if not is_localhost:
# Layer 2: Shared secret
try:
configured_secret = notification_manager.get_webhook_secret()
except Exception:
configured_secret = ''
if configured_secret:
request_secret = request.headers.get('X-Webhook-Secret', '')
if not request_secret:
return _reject(401, 'missing_secret', 401)
if not hmac.compare_digest(configured_secret, request_secret):
return _reject(401, 'invalid_secret', 401)
# Layer 3: Anti-replay timestamp
ts_header = request.headers.get('X-ProxMenux-Timestamp', '')
if not ts_header:
return _reject(401, 'missing_timestamp', 401)
try:
ts_value = int(ts_header)
except (ValueError, TypeError):
return _reject(401, 'invalid_timestamp', 401)
if abs(time.time() - ts_value) > _TIMESTAMP_MAX_DRIFT:
return _reject(401, 'timestamp_expired', 401)
# Layer 4: Replay cache
raw_body = request.get_data(as_text=True) or ''
signature = hashlib.sha256(f"{ts_value}:{raw_body}".encode(errors='replace')).hexdigest()
if _replay_cache.check_and_record(signature):
return _reject(409, 'replay_detected', 409)
# Layer 5: IP allowlist
try:
allowed_ips = notification_manager.get_webhook_allowed_ips()
if allowed_ips and client_ip not in allowed_ips:
return _reject(403, 'forbidden_ip', 403)
except Exception:
pass
# ── Parse and process payload ──
try:
content_type = request.content_type or ''
raw_data = request.get_data(as_text=True) or ''
# Try JSON first
payload = request.get_json(silent=True) or {}
# If not JSON, try form data
if not payload:
payload = dict(request.form)
# If still empty, try parsing raw data as JSON (PVE may not set Content-Type)
if not payload and raw_data:
import json
try:
payload = json.loads(raw_data)
except (json.JSONDecodeError, ValueError):
# PVE's {{ message }} may contain unescaped newlines/quotes
# that break JSON. Try to repair common issues.
try:
repaired = raw_data.replace('\n', '\\n').replace('\r', '\\r')
payload = json.loads(repaired)
except (json.JSONDecodeError, ValueError):
# Try to extract fields with regex from broken JSON
import re
title_m = re.search(r'"title"\s*:\s*"([^"]*)"', raw_data)
sev_m = re.search(r'"severity"\s*:\s*"([^"]*)"', raw_data)
if title_m:
payload = {
'title': title_m.group(1),
'body': raw_data[:1000],
'severity': sev_m.group(1) if sev_m else 'info',
'source': 'proxmox_hook',
}
# If still empty, try to salvage data from raw body
if not payload:
if raw_data:
# Last resort: treat raw text as the message body
payload = {
'title': 'PVE Notification',
'body': raw_data[:1000],
'severity': 'info',
'source': 'proxmox_hook',
}
else:
return _reject(400, 'empty_payload', 400)
result = notification_manager.process_webhook(payload)
# Always return 200 to PVE -- a non-200 makes PVE report the webhook as broken.
# The 'accepted' field in the JSON body indicates actual processing status.
return jsonify(result), 200
except Exception as e:
# Still return 200 to avoid PVE flagging the webhook as broken
return jsonify({'accepted': False, 'error': 'internal_error', 'detail': str(e)}), 200

View File

@@ -23,7 +23,6 @@ import time
import threading
import urllib.parse
import hardware_monitor
import health_persistence
import xml.etree.ElementTree as ET
from datetime import datetime, timedelta
from functools import wraps
@@ -47,8 +46,6 @@ from flask_health_routes import health_bp # noqa: E402
from flask_auth_routes import auth_bp # noqa: E402
from flask_proxmenux_routes import proxmenux_bp # noqa: E402
from flask_security_routes import security_bp # noqa: E402
from flask_notification_routes import notification_bp # noqa: E402
from notification_manager import notification_manager # noqa: E402
from jwt_middleware import require_auth # noqa: E402
import auth_manager # noqa: E402
@@ -123,7 +120,6 @@ app.register_blueprint(auth_bp)
app.register_blueprint(health_bp)
app.register_blueprint(proxmenux_bp)
app.register_blueprint(security_bp)
app.register_blueprint(notification_bp)
# Initialize terminal / WebSocket routes
init_terminal_routes(app)
@@ -1160,66 +1156,19 @@ def get_storage_info():
'ssd_life_left': smart_data.get('ssd_life_left') # Added
}
storage_data['disk_count'] += 1
health = smart_data.get('health', 'unknown').lower()
if health == 'healthy':
storage_data['healthy_disks'] += 1
elif health == 'warning':
storage_data['warning_disks'] += 1
elif health in ['critical', 'failed']:
storage_data['critical_disks'] += 1
except Exception as e:
# print(f"Error getting disk list: {e}")
pass
# Enrich physical disks with active I/O errors from health_persistence.
# This is the single source of truth -- health_monitor detects ATA/SCSI/IO
# errors via dmesg, records them in health_persistence, and we read them here.
try:
active_disk_errors = health_persistence.get_active_errors(category='disks')
for err in active_disk_errors:
details = err.get('details', {})
if isinstance(details, str):
try:
details = json.loads(details)
except (json.JSONDecodeError, TypeError):
details = {}
err_device = details.get('disk', '')
error_count = details.get('error_count', 0)
sample = details.get('sample', '')
severity = err.get('severity', 'WARNING')
# Match error to physical disk.
# err_device can be 'sda', 'nvme0n1', or 'ata8' (if resolution failed)
matched_disk = None
if err_device in physical_disks:
matched_disk = err_device
else:
# Try partial match: 'sda' matches disk 'sda'
for dk in physical_disks:
if dk == err_device or err_device.startswith(dk):
matched_disk = dk
break
if matched_disk:
physical_disks[matched_disk]['io_errors'] = {
'count': error_count,
'severity': severity,
'sample': sample,
'reason': err.get('reason', ''),
}
# Override health status if I/O errors are more severe
current_health = physical_disks[matched_disk].get('health', 'unknown').lower()
if severity == 'CRITICAL' and current_health != 'critical':
physical_disks[matched_disk]['health'] = 'critical'
elif severity == 'WARNING' and current_health in ('healthy', 'unknown'):
physical_disks[matched_disk]['health'] = 'warning'
except Exception:
pass
# Count disk health states AFTER I/O error enrichment
for disk_name, disk_info in physical_disks.items():
storage_data['disk_count'] += 1
health = disk_info.get('health', 'unknown').lower()
if health == 'healthy':
storage_data['healthy_disks'] += 1
elif health == 'warning':
storage_data['warning_disks'] += 1
elif health in ['critical', 'failed']:
storage_data['critical_disks'] += 1
storage_data['total'] = round(total_disk_size_bytes / (1024**4), 1)
# Get disk usage for mounted partitions
@@ -7145,16 +7094,6 @@ if __name__ == '__main__':
except Exception as e:
print(f"[ProxMenux] Vital signs sampler failed to start: {e}")
# ── Notification Service ──
try:
notification_manager.start()
if notification_manager._enabled:
print(f"[ProxMenux] Notification service started (channels: {list(notification_manager._channels.keys())})")
else:
print("[ProxMenux] Notification service loaded (disabled - configure in Settings)")
except Exception as e:
print(f"[ProxMenux] Notification service failed to start: {e}")
# Check for SSL configuration
ssl_ctx = None
try:

View File

@@ -324,13 +324,6 @@ class HealthMonitor:
Returns JSON structure with ALL 10 categories always present.
Now includes persistent error tracking.
"""
# Run cleanup on every status check so stale errors are auto-resolved
# using the user-configured Suppression Duration (single source of truth).
try:
health_persistence.cleanup_old_errors()
except Exception:
pass
active_errors = health_persistence.get_active_errors()
# No need to create persistent_issues dict here, it's implicitly handled by the checks
@@ -828,20 +821,8 @@ class HealthMonitor:
issues = []
storage_details = {}
# Check disk usage and mount status for important mounts.
# We detect actual mountpoints dynamically rather than hard-coding.
critical_mounts = set()
critical_mounts.add('/')
try:
for part in psutil.disk_partitions(all=False):
mp = part.mountpoint
# Include standard system mounts and PVE storage
if mp in ('/', '/var', '/tmp', '/boot', '/boot/efi') or \
mp.startswith('/var/lib/vz') or mp.startswith('/mnt/'):
critical_mounts.add(mp)
except Exception:
pass
critical_mounts = sorted(critical_mounts)
# Check disk usage and mount status first for critical mounts
critical_mounts = ['/']
for mount_point in critical_mounts:
try:
@@ -876,32 +857,9 @@ class HealthMonitor:
# Check filesystem usage only if not already flagged as critical
if mount_point not in storage_details or storage_details[mount_point].get('status') == 'OK':
fs_status = self._check_filesystem(mount_point)
error_key = f'disk_space_{mount_point}'
if fs_status['status'] != 'OK':
issues.append(f"{mount_point}: {fs_status['reason']}")
storage_details[mount_point] = fs_status
# Record persistent error for notifications
usage = psutil.disk_usage(mount_point)
avail_gb = usage.free / (1024**3)
if avail_gb >= 1:
avail_str = f"{avail_gb:.1f} GiB"
else:
avail_str = f"{usage.free / (1024**2):.0f} MiB"
health_persistence.record_error(
error_key=error_key,
category='disk',
severity=fs_status['status'],
reason=f'{mount_point}: {fs_status["reason"]}',
details={
'mount': mount_point,
'used': str(round(usage.percent, 1)),
'available': avail_str,
'dismissable': False,
}
)
else:
# Space recovered -- clear any previous alert
health_persistence.clear_error(error_key)
except Exception:
pass # Silently skip if mountpoint check fails
@@ -1094,67 +1052,16 @@ class HealthMonitor:
return storages
def _resolve_ata_to_disk(self, ata_port: str) -> str:
"""Resolve an ATA controller name (e.g. 'ata8') to a block device (e.g. 'sda').
Uses /sys/class/ata_port/ symlinks and /sys/block/ to find the mapping.
Falls back to parsing dmesg for 'ata8: SATA link up' -> 'sd 7:0:0:0: [sda]'.
"""
if not ata_port or not ata_port.startswith('ata'):
return ata_port
port_num = ata_port.replace('ata', '')
# Method 1: Walk /sys/class/ata_port/ -> host -> target -> block
try:
ata_path = f'/sys/class/ata_port/{ata_port}'
if os.path.exists(ata_path):
device_path = os.path.realpath(ata_path)
# Walk up to find the SCSI host, then find block devices
# Path: /sys/devices/.../ataX/hostY/targetY:0:0/Y:0:0:0/block/sdZ
for root, dirs, files in os.walk(os.path.dirname(device_path)):
if 'block' in dirs:
block_path = os.path.join(root, 'block')
devs = os.listdir(block_path)
if devs:
return devs[0] # e.g. 'sda'
except (OSError, IOError):
pass
# Method 2: Parse dmesg for ATA link messages
try:
result = subprocess.run(
['dmesg', '--notime'],
capture_output=True, text=True, timeout=2
)
if result.returncode == 0:
# Look for "ata8: SATA link up" followed by "sd X:0:0:0: [sda]"
lines = result.stdout.split('\n')
host_num = None
for line in lines:
m = re.search(rf'{ata_port}:\s+SATA link', line)
if m:
# ata port number maps to host(N-1) typically
host_num = int(port_num) - 1
if host_num is not None:
m2 = re.search(rf'sd\s+{host_num}:\d+:\d+:\d+:\s+\[(\w+)\]', line)
if m2:
return m2.group(1)
except (OSError, subprocess.TimeoutExpired):
pass
return ata_port # Return original if resolution fails
def _check_disks_optimized(self) -> Dict[str, Any]:
"""
Disk I/O error check -- the SINGLE source of truth for disk errors.
Reads dmesg for I/O/ATA/SCSI errors, counts per device, records in
health_persistence, and returns status for the health dashboard.
Resolves ATA controller names (ata8) to physical disks (sda).
Optimized disk check - always returns status.
Checks dmesg for I/O errors and SMART status.
NOTE: This function is now largely covered by _check_storage_optimized,
but kept for potential specific disk-level reporting if needed.
Currently, its primary function is to detect recent I/O errors.
"""
current_time = time.time()
disk_results = {} # Single dict for both WARNING and CRITICAL
disk_issues = {}
try:
# Check dmesg for I/O errors in the last 5 minutes
@@ -1165,52 +1072,17 @@ class HealthMonitor:
timeout=2
)
# Collect a sample line per device for richer error messages
disk_samples = {}
if result.returncode == 0:
for line in result.stdout.split('\n'):
line_lower = line.lower()
# Detect various disk error formats
is_disk_error = any(kw in line_lower for kw in [
'i/o error', 'scsi error', 'medium error',
'failed command:', 'exception emask',
])
ata_match = re.search(r'(ata\d+)[\.\d]*:.*(?:error|failed|exception)', line_lower)
if ata_match:
is_disk_error = True
if is_disk_error:
# Extract device from multiple formats
raw_device = None
for dev_re in [
r'dev\s+(sd[a-z]+)', # dev sdb
r'\[(sd[a-z]+)\]', # [sda]
r'/dev/(sd[a-z]+)', # /dev/sda
r'(nvme\d+n\d+)', # nvme0n1
r'device\s+(sd[a-z]+\d*)', # device sda1
r'(ata\d+)', # ata8 (ATA controller)
]:
dm = re.search(dev_re, line)
if dm:
raw_device = dm.group(1)
break
if raw_device:
# Resolve ATA port to physical disk name
if raw_device.startswith('ata'):
resolved = self._resolve_ata_to_disk(raw_device)
disk_name = resolved
else:
disk_name = raw_device.rstrip('0123456789') if raw_device.startswith('sd') else raw_device
if any(keyword in line_lower for keyword in ['i/o error', 'ata error', 'scsi error', 'medium error']):
# Try to extract disk name
disk_match = re.search(r'/dev/(sd[a-z]|nvme\d+n\d+)', line)
if disk_match:
disk_name = disk_match.group(1)
self.io_error_history[disk_name].append(current_time)
if disk_name not in disk_samples:
# Clean the sample: strip dmesg timestamp prefix
clean = re.sub(r'^\[.*?\]\s*', '', line.strip())
disk_samples[disk_name] = clean[:200]
# Clean old history and evaluate per-disk status
# Clean old history (keep errors from the last 5 minutes)
for disk in list(self.io_error_history.keys()):
self.io_error_history[disk] = [
t for t in self.io_error_history[disk]
@@ -1218,67 +1090,57 @@ class HealthMonitor:
]
error_count = len(self.io_error_history[disk])
error_key = f'disk_{disk}'
sample = disk_samples.get(disk, '')
display = f'/dev/{disk}' if not disk.startswith('/') else disk
# Report based on recent error count
if error_count >= 3:
error_key = f'disk_{disk}'
severity = 'CRITICAL'
reason = f'{display}: {error_count} I/O errors in 5 min'
if sample:
reason += f'\n{sample}'
reason = f'{error_count} I/O errors in 5 minutes'
health_persistence.record_error(
error_key=error_key,
category='disks',
severity=severity,
reason=reason,
details={'disk': disk, 'device': display,
'error_count': error_count,
'sample': sample, 'dismissable': False}
details={'disk': disk, 'error_count': error_count, 'dismissable': False}
)
disk_results[display] = {
disk_details[disk] = {
'status': severity,
'reason': reason,
'device': disk,
'error_count': error_count,
'dismissable': False,
'dismissable': False
}
elif error_count >= 1:
error_key = f'disk_{disk}'
severity = 'WARNING'
reason = f'{display}: {error_count} I/O error(s) in 5 min'
if sample:
reason += f'\n{sample}'
reason = f'{error_count} I/O error(s) in 5 minutes'
rec_result = health_persistence.record_error(
health_persistence.record_error(
error_key=error_key,
category='disks',
severity=severity,
reason=reason,
details={'disk': disk, 'device': display,
'error_count': error_count,
'sample': sample, 'dismissable': True}
details={'disk': disk, 'error_count': error_count, 'dismissable': True}
)
if not rec_result or rec_result.get('type') != 'skipped_acknowledged':
disk_results[display] = {
'status': severity,
'reason': reason,
'device': disk,
'error_count': error_count,
'dismissable': True,
}
disk_issues[f'/dev/{disk}'] = {
'status': severity,
'reason': reason,
'dismissable': True
}
else:
error_key = f'disk_{disk}'
health_persistence.resolve_error(error_key, 'Disk errors cleared')
if not disk_results:
if not disk_issues:
return {'status': 'OK'}
has_critical = any(d.get('status') == 'CRITICAL' for d in disk_results.values())
has_critical = any(d.get('status') == 'CRITICAL' for d in disk_issues.values())
return {
'status': 'CRITICAL' if has_critical else 'WARNING',
'reason': f"{len(disk_results)} disk(s) with recent errors",
'details': disk_results
'reason': f"{len(disk_issues)} disk(s) with recent errors",
'details': disk_issues
}
except Exception as e:
@@ -1489,51 +1351,12 @@ class HealthMonitor:
except Exception:
return {'status': 'UNKNOWN', 'reason': 'Ping command failed'}
def _is_vzdump_active(self) -> bool:
"""Check if a vzdump (backup) job is currently running."""
try:
with open('/var/log/pve/tasks/active', 'r') as f:
for line in f:
if ':vzdump:' in line:
return True
except (OSError, IOError):
pass
return False
def _resolve_vm_name(self, vmid: str) -> str:
"""Resolve VMID to guest name from PVE config files."""
if not vmid:
return ''
for base in ['/etc/pve/qemu-server', '/etc/pve/lxc']:
conf = os.path.join(base, f'{vmid}.conf')
try:
with open(conf) as f:
for line in f:
if line.startswith('hostname:') or line.startswith('name:'):
return line.split(':', 1)[1].strip()
except (OSError, IOError):
continue
return ''
def _check_vms_cts_optimized(self) -> Dict[str, Any]:
"""
Optimized VM/CT check - detects qmp failures and startup errors from logs.
Improved detection of container and VM errors from journalctl.
"""
try:
# First: auto-resolve any persisted VM/CT errors where the guest
# is now running. This clears stale "Failed to start" / QMP
# errors that are no longer relevant.
try:
active_vm_errors = health_persistence.get_active_errors('vms')
for err in active_vm_errors:
details = err.get('details') or {}
vmid = details.get('id', '')
if vmid:
health_persistence.check_vm_running(vmid)
except Exception:
pass
issues = []
vm_details = {}
@@ -1544,28 +1367,20 @@ class HealthMonitor:
timeout=3
)
# Check if vzdump is running -- QMP timeouts during backup are normal
_vzdump_running = self._is_vzdump_active()
if result.returncode == 0:
for line in result.stdout.split('\n'):
line_lower = line.lower()
vm_qmp_match = re.search(r'vm\s+(\d+)\s+qmp\s+command.*(?:failed|unable|timeout)', line_lower)
if vm_qmp_match:
if _vzdump_running:
continue # Normal during backup
vmid = vm_qmp_match.group(1)
vm_name = self._resolve_vm_name(vmid)
display = f"VM {vmid} ({vm_name})" if vm_name else f"VM {vmid}"
key = f'vm_{vmid}'
if key not in vm_details:
issues.append(f'{display}: QMP communication issue')
issues.append(f'VM {vmid}: Communication issue')
vm_details[key] = {
'status': 'WARNING',
'reason': f'{display}: QMP command failed or timed out.\n{line.strip()[:200]}',
'reason': 'QMP command timeout',
'id': vmid,
'vmname': vm_name,
'type': 'VM'
}
continue
@@ -1586,15 +1401,11 @@ class HealthMonitor:
else:
reason = 'Container error'
ct_name = self._resolve_vm_name(ctid)
display = f"CT {ctid} ({ct_name})" if ct_name else f"CT {ctid}"
full_reason = f'{display}: {reason}\n{line.strip()[:200]}'
issues.append(f'{display}: {reason}')
issues.append(f'CT {ctid}: {reason}')
vm_details[key] = {
'status': 'WARNING' if 'device' in reason.lower() else 'CRITICAL',
'reason': full_reason,
'reason': reason,
'id': ctid,
'vmname': ct_name,
'type': 'CT'
}
continue
@@ -1629,15 +1440,11 @@ class HealthMonitor:
vmid = id_match.group(1)
key = f'vmct_{vmid}'
if key not in vm_details:
vm_name = self._resolve_vm_name(vmid)
display = f"VM/CT {vmid} ({vm_name})" if vm_name else f"VM/CT {vmid}"
full_reason = f'{display}: Failed to start\n{line.strip()[:200]}'
issues.append(f'{display}: Failed to start')
issues.append(f'VM/CT {vmid}: Failed to start')
vm_details[key] = {
'status': 'CRITICAL',
'reason': full_reason,
'reason': 'Failed to start',
'id': vmid,
'vmname': vm_name,
'type': 'VM/CT'
}
@@ -1697,38 +1504,31 @@ class HealthMonitor:
timeout=3
)
_vzdump_running = self._is_vzdump_active()
if result.returncode == 0:
for line in result.stdout.split('\n'):
line_lower = line.lower()
# VM QMP errors (skip during active backup -- normal behavior)
# VM QMP errors
vm_qmp_match = re.search(r'vm\s+(\d+)\s+qmp\s+command.*(?:failed|unable|timeout)', line_lower)
if vm_qmp_match:
if _vzdump_running:
continue # Normal during backup
vmid = vm_qmp_match.group(1)
vm_name = self._resolve_vm_name(vmid)
display = f"VM {vmid} ({vm_name})" if vm_name else f"VM {vmid}"
error_key = f'vm_{vmid}'
if error_key not in vm_details:
rec_result = health_persistence.record_error(
# Record persistent error
health_persistence.record_error(
error_key=error_key,
category='vms',
severity='WARNING',
reason=f'{display}: QMP command failed or timed out.\n{line.strip()[:200]}',
details={'id': vmid, 'vmname': vm_name, 'type': 'VM'}
reason='QMP command timeout',
details={'id': vmid, 'type': 'VM'}
)
if not rec_result or rec_result.get('type') != 'skipped_acknowledged':
issues.append(f'{display}: QMP communication issue')
vm_details[error_key] = {
'status': 'WARNING',
'reason': f'{display}: QMP command failed or timed out',
'id': vmid,
'vmname': vm_name,
'type': 'VM'
}
issues.append(f'VM {vmid}: Communication issue')
vm_details[error_key] = {
'status': 'WARNING',
'reason': 'QMP command timeout',
'id': vmid,
'type': 'VM'
}
continue
# Container errors (including startup issues via vzstart)
@@ -1748,21 +1548,20 @@ class HealthMonitor:
reason = 'Startup error'
# Record persistent error
rec_result = health_persistence.record_error(
health_persistence.record_error(
error_key=error_key,
category='vms',
severity='WARNING',
reason=reason,
details={'id': ctid, 'type': 'CT'}
)
if not rec_result or rec_result.get('type') != 'skipped_acknowledged':
issues.append(f'CT {ctid}: {reason}')
vm_details[error_key] = {
'status': 'WARNING',
'reason': reason,
'id': ctid,
'type': 'CT'
}
issues.append(f'CT {ctid}: {reason}')
vm_details[error_key] = {
'status': 'WARNING',
'reason': reason,
'id': ctid,
'type': 'CT'
}
# Generic failed to start for VMs and CTs
if any(keyword in line_lower for keyword in ['failed to start', 'cannot start', 'activation failed', 'start error']):
@@ -1787,28 +1586,22 @@ class HealthMonitor:
vm_type = 'VM/CT'
if error_key not in vm_details:
vm_name = self._resolve_vm_name(vmid_ctid)
display = f"{vm_type} {vmid_ctid}"
if vm_name:
display = f"{vm_type} {vmid_ctid} ({vm_name})"
reason = f'{display}: Failed to start\n{line.strip()[:200]}'
reason = 'Failed to start'
# Record persistent error
rec_result = health_persistence.record_error(
health_persistence.record_error(
error_key=error_key,
category='vms',
severity='CRITICAL',
reason=reason,
details={'id': vmid_ctid, 'vmname': vm_name, 'type': vm_type}
details={'id': vmid_ctid, 'type': vm_type}
)
if not rec_result or rec_result.get('type') != 'skipped_acknowledged':
issues.append(f'{display}: Failed to start')
vm_details[error_key] = {
'status': 'CRITICAL',
'reason': reason,
'id': vmid_ctid,
'vmname': vm_name,
'type': vm_type
}
issues.append(f'{vm_type} {vmid_ctid}: {reason}')
vm_details[error_key] = {
'status': 'CRITICAL',
'reason': reason,
'id': vmid_ctid,
'type': vm_type
}
# Build checks dict from vm_details
checks = {}
@@ -1899,23 +1692,16 @@ class HealthMonitor:
if failed_services:
reason = f'Services inactive: {", ".join(failed_services)}'
# Record each failed service in persistence, respecting dismiss
active_failed = []
# Record each failed service in persistence
for svc in failed_services:
error_key = f'pve_service_{svc}'
rec_result = health_persistence.record_error(
health_persistence.record_error(
error_key=error_key,
category='pve_services',
severity='CRITICAL',
reason=f'PVE service {svc} is {service_details.get(svc, "inactive")}',
details={'service': svc, 'state': service_details.get(svc, 'inactive')}
)
if rec_result and rec_result.get('type') == 'skipped_acknowledged':
# Mark as dismissed in checks for frontend
if svc in checks:
checks[svc]['dismissed'] = True
else:
active_failed.append(svc)
# Auto-clear services that recovered
for svc in services_to_check:
@@ -1924,21 +1710,10 @@ class HealthMonitor:
if health_persistence.is_error_active(error_key):
health_persistence.clear_error(error_key)
# If all failed services are dismissed, return OK
if not active_failed:
return {
'status': 'OK',
'reason': None,
'failed': [],
'is_cluster': is_cluster,
'services_checked': len(services_to_check),
'checks': checks
}
return {
'status': 'CRITICAL',
'reason': f'Services inactive: {", ".join(active_failed)}',
'failed': active_failed,
'reason': reason,
'failed': failed_services,
'is_cluster': is_cluster,
'services_checked': len(services_to_check),
'checks': checks
@@ -2096,8 +1871,7 @@ class HealthMonitor:
self.persistent_log_patterns[pattern] = {
'count': 1,
'first_seen': current_time,
'last_seen': current_time,
'sample': line.strip()[:200], # Original line for display
'last_seen': current_time
}
for line in previous_lines:
@@ -2129,18 +1903,6 @@ class HealthMonitor:
if recent_count >= 5 and recent_count >= prev_count * 4:
spike_errors[pattern] = recent_count
# Helper: get human-readable samples from normalized patterns
def _get_samples(error_dict, max_items=3):
"""Return list of readable sample lines for error patterns."""
samples = []
for pattern in list(error_dict.keys())[:max_items]:
pdata = self.persistent_log_patterns.get(pattern, {})
sample = pdata.get('sample', pattern)
# Trim timestamp prefix if present (e.g. "Feb 27 16:03:35 host ")
clean = re.sub(r'^[A-Z][a-z]{2}\s+\d+\s+[\d:]+\s+\S+\s+', '', sample)
samples.append(clean[:120])
return samples
persistent_errors = {}
for pattern, data in self.persistent_log_patterns.items():
time_span = current_time - data['first_seen']
@@ -2151,16 +1913,12 @@ class HealthMonitor:
pattern_hash = hashlib.md5(pattern.encode()).hexdigest()[:8]
error_key = f'log_persistent_{pattern_hash}'
if not health_persistence.is_error_active(error_key, category='logs'):
# Use the original sample line for the notification,
# not the normalized pattern (which has IDs replaced).
sample = data.get('sample', pattern)
health_persistence.record_error(
error_key=error_key,
category='logs',
severity='WARNING',
reason=f'Recurring error ({data["count"]}x): {sample[:150]}',
details={'pattern': pattern, 'sample': sample,
'dismissable': True, 'occurrences': data['count']}
reason=f'Persistent error pattern detected: {pattern[:80]}',
details={'pattern': pattern, 'dismissable': True, 'occurrences': data['count']}
)
patterns_to_remove = [
@@ -2182,33 +1940,26 @@ class HealthMonitor:
reason = f'Critical error detected: {representative_error[:100]}'
elif cascade_count > 0:
status = 'WARNING'
samples = _get_samples(cascading_errors, 3)
reason = f'Error cascade ({cascade_count} patterns repeating):\n' + '\n'.join(f' - {s}' for s in samples)
reason = f'Error cascade detected: {cascade_count} pattern(s) repeating ≥15 times in 3min'
elif spike_count > 0:
status = 'WARNING'
samples = _get_samples(spike_errors, 3)
reason = f'Error spike ({spike_count} patterns with 4x increase):\n' + '\n'.join(f' - {s}' for s in samples)
reason = f'Error spike detected: {spike_count} pattern(s) increased 4x'
elif persistent_count > 0:
status = 'WARNING'
samples = _get_samples(persistent_errors, 3)
reason = f'Persistent errors ({persistent_count} patterns over 15+ min):\n' + '\n'.join(f' - {s}' for s in samples)
reason = f'Persistent errors: {persistent_count} pattern(s) recurring over 15+ minutes'
else:
# No significant issues found
status = 'OK'
reason = None
# Record/clear persistent errors for each log sub-check so Dismiss works
cascade_samples = _get_samples(cascading_errors, 2) if cascade_count else []
spike_samples = _get_samples(spike_errors, 2) if spike_count else []
persist_samples = _get_samples(persistent_errors, 2) if persistent_count else []
log_sub_checks = {
'log_error_cascade': {'active': cascade_count > 0, 'severity': 'WARNING',
'reason': f'{cascade_count} pattern(s) repeating >=15 times:\n' + '\n'.join(f' - {s}' for s in cascade_samples) if cascade_count else ''},
'reason': f'{cascade_count} pattern(s) repeating >=15 times'},
'log_error_spike': {'active': spike_count > 0, 'severity': 'WARNING',
'reason': f'{spike_count} pattern(s) with 4x increase:\n' + '\n'.join(f' - {s}' for s in spike_samples) if spike_count else ''},
'reason': f'{spike_count} pattern(s) with 4x increase'},
'log_persistent_errors': {'active': persistent_count > 0, 'severity': 'WARNING',
'reason': f'{persistent_count} recurring pattern(s) over 15+ min:\n' + '\n'.join(f' - {s}' for s in persist_samples) if persistent_count else ''},
'reason': f'{persistent_count} recurring pattern(s) over 15+ min'},
'log_critical_errors': {'active': unique_critical_count > 0, 'severity': 'CRITICAL',
'reason': f'{unique_critical_count} critical error(s) found', 'dismissable': False},
}
@@ -2584,7 +2335,20 @@ class HealthMonitor:
msg = f'{total_banned} IP(s) currently banned by Fail2Ban (jails: {jails_str})'
result['status'] = 'WARNING'
result['detail'] = msg
# Persistence handled by _check_security caller via security_fail2ban key
# Record in persistence (dismissable)
health_persistence.record_error(
error_key='fail2ban',
category='security',
severity='WARNING',
reason=msg,
details={
'banned_count': total_banned,
'jails': jails_with_bans,
'banned_ips': all_banned_ips[:5],
'dismissable': True
}
)
else:
result['detail'] = f'Fail2Ban active ({len(jails)} jail(s), no current bans)'
# Auto-resolve if previously banned IPs are now gone
@@ -2692,60 +2456,14 @@ class HealthMonitor:
except Exception:
pass
# Persist errors and respect dismiss for each sub-check
dismissed_keys = set()
security_sub_checks = {
'security_login_attempts': checks.get('login_attempts', {}),
'security_certificates': checks.get('certificates', {}),
'security_uptime': checks.get('uptime', {}),
'security_fail2ban': checks.get('fail2ban', {}),
}
for err_key, check_info in security_sub_checks.items():
check_status = check_info.get('status', 'OK')
if check_status not in ('OK', 'INFO'):
is_dismissable = check_info.get('dismissable', True)
rec_result = health_persistence.record_error(
error_key=err_key,
category='security',
severity=check_status,
reason=check_info.get('detail', ''),
details={'dismissable': is_dismissable}
)
if rec_result and rec_result.get('type') == 'skipped_acknowledged':
dismissed_keys.add(err_key)
elif health_persistence.is_error_active(err_key):
health_persistence.clear_error(err_key)
# Rebuild issues excluding dismissed sub-checks
key_to_check = {
'security_login_attempts': 'login_attempts',
'security_certificates': 'certificates',
'security_uptime': 'uptime',
'security_fail2ban': 'fail2ban',
}
active_issues = []
for err_key, check_name in key_to_check.items():
if err_key in dismissed_keys:
# Mark as dismissed in checks for the frontend
if check_name in checks:
checks[check_name]['dismissed'] = True
continue
check_info = checks.get(check_name, {})
if check_info.get('status', 'OK') not in ('OK', 'INFO'):
active_issues.append(check_info.get('detail', ''))
# Determine overall security status from non-dismissed issues only
if active_issues:
has_critical = any(
c.get('status') == 'CRITICAL'
for k, c in checks.items()
if f'security_{k}' not in dismissed_keys
)
# Determine overall security status
if issues:
# Check if any sub-check is CRITICAL
has_critical = any(c.get('status') == 'CRITICAL' for c in checks.values())
overall_status = 'CRITICAL' if has_critical else 'WARNING'
return {
'status': overall_status,
'reason': '; '.join(active_issues[:2]),
'reason': '; '.join(issues[:2]),
'checks': checks
}

View File

@@ -25,8 +25,12 @@ from pathlib import Path
class HealthPersistence:
"""Manages persistent health error tracking"""
# Default suppression duration when no user setting exists for a category.
# Users override per-category via the Suppression Duration settings UI.
# Error retention periods (seconds)
VM_ERROR_RETENTION = 48 * 3600 # 48 hours
LOG_ERROR_RETENTION = 24 * 3600 # 24 hours
DISK_ERROR_RETENTION = 48 * 3600 # 48 hours
# Default suppression: 24 hours (user can change per-category in settings)
DEFAULT_SUPPRESSION_HOURS = 24
# Mapping from error categories to settings keys
@@ -110,31 +114,6 @@ class HealthPersistence:
)
''')
# Notification history table (records all sent notifications)
cursor.execute('''
CREATE TABLE IF NOT EXISTS notification_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
event_type TEXT NOT NULL,
channel TEXT NOT NULL,
title TEXT,
message TEXT,
severity TEXT,
sent_at TEXT NOT NULL,
success INTEGER DEFAULT 1,
error_message TEXT,
source TEXT DEFAULT 'server'
)
''')
# Notification cooldown persistence (survives restarts)
cursor.execute('''
CREATE TABLE IF NOT EXISTS notification_last_sent (
fingerprint TEXT PRIMARY KEY,
last_sent_ts INTEGER NOT NULL,
count INTEGER DEFAULT 1
)
''')
# Migration: add suppression_hours column to errors if not present
cursor.execute("PRAGMA table_info(errors)")
columns = [col[1] for col in cursor.fetchall()]
@@ -146,9 +125,6 @@ class HealthPersistence:
cursor.execute('CREATE INDEX IF NOT EXISTS idx_category ON errors(category)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_resolved ON errors(resolved_at)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_events_error ON events(error_key)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_notif_sent_at ON notification_history(sent_at)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_notif_severity ON notification_history(severity)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_nls_ts ON notification_last_sent(last_sent_ts)')
conn.commit()
conn.close()
@@ -492,58 +468,32 @@ class HealthPersistence:
cursor = conn.cursor()
now = datetime.now()
now_iso = now.isoformat()
# Delete resolved errors older than 7 days
cutoff_resolved = (now - timedelta(days=7)).isoformat()
cursor.execute('DELETE FROM errors WHERE resolved_at < ?', (cutoff_resolved,))
# ── Auto-resolve stale errors using Suppression Duration settings ──
# Read per-category suppression hours from user_settings.
# If the user hasn't configured a value, use DEFAULT_SUPPRESSION_HOURS.
# This is the SINGLE source of truth for auto-resolution timing.
user_settings = {}
try:
cursor.execute(
'SELECT setting_key, setting_value FROM user_settings WHERE setting_key LIKE ?',
('suppress_%',)
)
for row in cursor.fetchall():
user_settings[row[0]] = row[1]
except Exception:
pass
for category, setting_key in self.CATEGORY_SETTING_MAP.items():
stored = user_settings.get(setting_key)
try:
hours = int(stored) if stored else self.DEFAULT_SUPPRESSION_HOURS
except (ValueError, TypeError):
hours = self.DEFAULT_SUPPRESSION_HOURS
# -1 means permanently suppressed -- skip auto-resolve
if hours < 0:
continue
cutoff = (now - timedelta(hours=hours)).isoformat()
cursor.execute('''
UPDATE errors
SET resolved_at = ?
WHERE category = ?
AND resolved_at IS NULL
AND last_seen < ?
AND acknowledged = 0
''', (now_iso, category, cutoff))
# Catch-all: auto-resolve any error from an unmapped category
# whose last_seen exceeds DEFAULT_SUPPRESSION_HOURS.
fallback_cutoff = (now - timedelta(hours=self.DEFAULT_SUPPRESSION_HOURS)).isoformat()
# Auto-resolve VM/CT errors older than 48h
cutoff_vm = (now - timedelta(seconds=self.VM_ERROR_RETENTION)).isoformat()
cursor.execute('''
UPDATE errors
UPDATE errors
SET resolved_at = ?
WHERE resolved_at IS NULL
WHERE category = 'vms'
AND resolved_at IS NULL
AND first_seen < ?
AND acknowledged = 0
AND last_seen < ?
''', (now_iso, fallback_cutoff))
''', (now.isoformat(), cutoff_vm))
# Auto-resolve log errors older than 24h
cutoff_logs = (now - timedelta(seconds=self.LOG_ERROR_RETENTION)).isoformat()
cursor.execute('''
UPDATE errors
SET resolved_at = ?
WHERE category = 'logs'
AND resolved_at IS NULL
AND first_seen < ?
AND acknowledged = 0
''', (now.isoformat(), cutoff_logs))
# Delete old events (>30 days)
cutoff_events = (now - timedelta(days=30)).isoformat()

View File

@@ -1,579 +0,0 @@
"""
ProxMenux Notification Channels
Provides transport adapters for Telegram, Gotify, and Discord.
Each channel implements send() and test() with:
- Retry with exponential backoff (3 attempts)
- Request timeout of 10s
- Rate limiting (max 30 msg/min per channel)
Author: MacRimi
"""
import json
import time
import urllib.request
import urllib.error
import urllib.parse
from abc import ABC, abstractmethod
from collections import deque
from typing import Tuple, Optional, Dict, Any
# ─── Rate Limiter ────────────────────────────────────────────────
class RateLimiter:
"""Token-bucket rate limiter: max N messages per window."""
def __init__(self, max_calls: int = 30, window_seconds: int = 60):
self.max_calls = max_calls
self.window = window_seconds
self._timestamps: deque = deque()
def allow(self) -> bool:
now = time.monotonic()
while self._timestamps and now - self._timestamps[0] > self.window:
self._timestamps.popleft()
if len(self._timestamps) >= self.max_calls:
return False
self._timestamps.append(now)
return True
def wait_time(self) -> float:
if not self._timestamps:
return 0.0
return max(0.0, self.window - (time.monotonic() - self._timestamps[0]))
# ─── Base Channel ────────────────────────────────────────────────
class NotificationChannel(ABC):
"""Abstract base for all notification channels."""
MAX_RETRIES = 3
RETRY_DELAYS = [2, 4, 8] # exponential backoff seconds
REQUEST_TIMEOUT = 10
def __init__(self):
self._rate_limiter = RateLimiter(max_calls=30, window_seconds=60)
@abstractmethod
def send(self, title: str, message: str, severity: str = 'INFO',
data: Optional[Dict] = None) -> Dict[str, Any]:
"""Send a notification. Returns {success, error, channel}."""
pass
@abstractmethod
def test(self) -> Tuple[bool, str]:
"""Send a test message. Returns (success, error_message)."""
pass
@abstractmethod
def validate_config(self) -> Tuple[bool, str]:
"""Check if config is valid without sending. Returns (valid, error)."""
pass
def _http_request(self, url: str, data: bytes, headers: Dict[str, str],
method: str = 'POST') -> Tuple[int, str]:
"""Execute HTTP request with timeout. Returns (status_code, body)."""
req = urllib.request.Request(url, data=data, headers=headers, method=method)
try:
with urllib.request.urlopen(req, timeout=self.REQUEST_TIMEOUT) as resp:
body = resp.read().decode('utf-8', errors='replace')
return resp.status, body
except urllib.error.HTTPError as e:
body = e.read().decode('utf-8', errors='replace') if e.fp else str(e)
return e.code, body
except urllib.error.URLError as e:
return 0, str(e.reason)
except Exception as e:
return 0, str(e)
def _send_with_retry(self, send_fn) -> Dict[str, Any]:
"""Wrap a send function with rate limiting and retry logic."""
if not self._rate_limiter.allow():
wait = self._rate_limiter.wait_time()
return {
'success': False,
'error': f'Rate limited. Retry in {wait:.0f}s',
'rate_limited': True
}
last_error = ''
for attempt in range(self.MAX_RETRIES):
try:
status, body = send_fn()
if 200 <= status < 300:
return {'success': True, 'error': None}
last_error = f'HTTP {status}: {body[:200]}'
except Exception as e:
last_error = str(e)
if attempt < self.MAX_RETRIES - 1:
time.sleep(self.RETRY_DELAYS[attempt])
return {'success': False, 'error': last_error}
# ─── Telegram ────────────────────────────────────────────────────
class TelegramChannel(NotificationChannel):
"""Telegram Bot API channel using HTML parse mode."""
API_BASE = 'https://api.telegram.org/bot{token}/sendMessage'
MAX_LENGTH = 4096
SEVERITY_ICONS = {
'CRITICAL': '\U0001F534', # red circle
'WARNING': '\U0001F7E1', # yellow circle
'INFO': '\U0001F535', # blue circle
'OK': '\U0001F7E2', # green circle
'UNKNOWN': '\u26AA', # white circle
}
def __init__(self, bot_token: str, chat_id: str):
super().__init__()
token = bot_token.strip()
# Strip 'bot' prefix if user included it (API_BASE already adds it)
if token.lower().startswith('bot') and ':' in token[3:]:
token = token[3:]
self.bot_token = token
self.chat_id = chat_id.strip()
def validate_config(self) -> Tuple[bool, str]:
if not self.bot_token:
return False, 'Bot token is required'
if not self.chat_id:
return False, 'Chat ID is required'
if ':' not in self.bot_token:
return False, 'Invalid bot token format (expected BOT_ID:TOKEN)'
return True, ''
def send(self, title: str, message: str, severity: str = 'INFO',
data: Optional[Dict] = None) -> Dict[str, Any]:
icon = self.SEVERITY_ICONS.get(severity, self.SEVERITY_ICONS['INFO'])
html_msg = f"<b>{icon} {self._escape_html(title)}</b>\n\n{self._escape_html(message)}"
# Split long messages
chunks = self._split_message(html_msg)
result = {'success': True, 'error': None, 'channel': 'telegram'}
for chunk in chunks:
res = self._send_with_retry(lambda c=chunk: self._post_message(c))
if not res['success']:
result = {**res, 'channel': 'telegram'}
break
return result
def test(self) -> Tuple[bool, str]:
valid, err = self.validate_config()
if not valid:
return False, err
result = self.send(
'ProxMenux Test',
'Notification service is working correctly.\nThis is a test message from ProxMenux Monitor.',
'INFO'
)
return result['success'], result.get('error', '')
def _post_message(self, text: str) -> Tuple[int, str]:
url = self.API_BASE.format(token=self.bot_token)
payload = json.dumps({
'chat_id': self.chat_id,
'text': text,
'parse_mode': 'HTML',
'disable_web_page_preview': True,
}).encode('utf-8')
return self._http_request(url, payload, {'Content-Type': 'application/json'})
def _split_message(self, text: str) -> list:
if len(text) <= self.MAX_LENGTH:
return [text]
chunks = []
while text:
if len(text) <= self.MAX_LENGTH:
chunks.append(text)
break
split_at = text.rfind('\n', 0, self.MAX_LENGTH)
if split_at == -1:
split_at = self.MAX_LENGTH
chunks.append(text[:split_at])
text = text[split_at:].lstrip('\n')
return chunks
@staticmethod
def _escape_html(text: str) -> str:
return (text
.replace('&', '&amp;')
.replace('<', '&lt;')
.replace('>', '&gt;'))
# ─── Gotify ──────────────────────────────────────────────────────
class GotifyChannel(NotificationChannel):
"""Gotify push notification channel with priority mapping."""
PRIORITY_MAP = {
'OK': 1,
'INFO': 2,
'UNKNOWN': 3,
'WARNING': 5,
'CRITICAL': 10,
}
def __init__(self, server_url: str, app_token: str):
super().__init__()
self.server_url = server_url.rstrip('/').strip()
self.app_token = app_token.strip()
def validate_config(self) -> Tuple[bool, str]:
if not self.server_url:
return False, 'Server URL is required'
if not self.app_token:
return False, 'Application token is required'
if not self.server_url.startswith(('http://', 'https://')):
return False, 'Server URL must start with http:// or https://'
return True, ''
def send(self, title: str, message: str, severity: str = 'INFO',
data: Optional[Dict] = None) -> Dict[str, Any]:
priority = self.PRIORITY_MAP.get(severity, 2)
result = self._send_with_retry(
lambda: self._post_message(title, message, priority)
)
result['channel'] = 'gotify'
return result
def test(self) -> Tuple[bool, str]:
valid, err = self.validate_config()
if not valid:
return False, err
result = self.send(
'ProxMenux Test',
'Notification service is working correctly.\nThis is a test message from ProxMenux Monitor.',
'INFO'
)
return result['success'], result.get('error', '')
def _post_message(self, title: str, message: str, priority: int) -> Tuple[int, str]:
url = f"{self.server_url}/message?token={self.app_token}"
payload = json.dumps({
'title': title,
'message': message,
'priority': priority,
'extras': {
'client::display': {'contentType': 'text/markdown'}
}
}).encode('utf-8')
return self._http_request(url, payload, {'Content-Type': 'application/json'})
# ─── Discord ─────────────────────────────────────────────────────
class DiscordChannel(NotificationChannel):
"""Discord webhook channel with color-coded embeds."""
MAX_EMBED_DESC = 2048
SEVERITY_COLORS = {
'CRITICAL': 0xED4245, # red
'WARNING': 0xFEE75C, # yellow
'INFO': 0x5865F2, # blurple
'OK': 0x57F287, # green
'UNKNOWN': 0x99AAB5, # grey
}
def __init__(self, webhook_url: str):
super().__init__()
self.webhook_url = webhook_url.strip()
def validate_config(self) -> Tuple[bool, str]:
if not self.webhook_url:
return False, 'Webhook URL is required'
if 'discord.com/api/webhooks/' not in self.webhook_url:
return False, 'Invalid Discord webhook URL'
return True, ''
def send(self, title: str, message: str, severity: str = 'INFO',
data: Optional[Dict] = None) -> Dict[str, Any]:
color = self.SEVERITY_COLORS.get(severity, 0x5865F2)
desc = message[:self.MAX_EMBED_DESC] if len(message) > self.MAX_EMBED_DESC else message
embed = {
'title': title,
'description': desc,
'color': color,
'footer': {'text': 'ProxMenux Monitor'},
'timestamp': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()),
}
# Use structured fields from render_template if available
rendered_fields = (data or {}).get('_rendered_fields', [])
if rendered_fields:
embed['fields'] = [
{'name': name, 'value': val[:1024], 'inline': True}
for name, val in rendered_fields[:25] # Discord limit: 25 fields
]
elif data:
fields = []
if data.get('category'):
fields.append({'name': 'Category', 'value': data['category'], 'inline': True})
if data.get('hostname'):
fields.append({'name': 'Host', 'value': data['hostname'], 'inline': True})
if data.get('severity'):
fields.append({'name': 'Severity', 'value': data['severity'], 'inline': True})
if fields:
embed['fields'] = fields
result = self._send_with_retry(
lambda: self._post_webhook(embed)
)
result['channel'] = 'discord'
return result
def test(self) -> Tuple[bool, str]:
valid, err = self.validate_config()
if not valid:
return False, err
result = self.send(
'ProxMenux Test',
'Notification service is working correctly.\nThis is a test message from ProxMenux Monitor.',
'INFO'
)
return result['success'], result.get('error', '')
def _post_webhook(self, embed: Dict) -> Tuple[int, str]:
payload = json.dumps({
'username': 'ProxMenux',
'embeds': [embed]
}).encode('utf-8')
return self._http_request(
self.webhook_url, payload, {'Content-Type': 'application/json'}
)
# ─── Email Channel ──────────────────────────────────────────────
class EmailChannel(NotificationChannel):
"""Email notification channel using SMTP (smtplib) or sendmail fallback.
Config keys:
host, port, username, password, tls_mode (none|starttls|ssl),
from_address, to_addresses (comma-separated), subject_prefix, timeout
"""
def __init__(self, config: Dict[str, str]):
super().__init__()
self.host = config.get('host', '')
self.port = int(config.get('port', 587) or 587)
self.username = config.get('username', '')
self.password = config.get('password', '')
self.tls_mode = config.get('tls_mode', 'starttls') # none | starttls | ssl
self.from_address = config.get('from_address', '')
self.to_addresses = self._parse_recipients(config.get('to_addresses', ''))
self.subject_prefix = config.get('subject_prefix', '[ProxMenux]')
self.timeout = int(config.get('timeout', 10) or 10)
@staticmethod
def _parse_recipients(raw) -> list:
if isinstance(raw, list):
return [a.strip() for a in raw if a.strip()]
return [addr.strip() for addr in str(raw).split(',') if addr.strip()]
def validate_config(self) -> Tuple[bool, str]:
if not self.to_addresses:
return False, 'No recipients configured'
if not self.from_address:
return False, 'No from address configured'
# Must have SMTP host OR local sendmail available
if not self.host:
import os
if not os.path.exists('/usr/sbin/sendmail'):
return False, 'No SMTP host configured and /usr/sbin/sendmail not found'
return True, ''
def send(self, title: str, message: str, severity: str = 'INFO',
data: Optional[Dict] = None) -> Dict[str, Any]:
subject = f"{self.subject_prefix} [{severity}] {title}"
def _do_send():
if self.host:
return self._send_smtp(subject, message, severity)
else:
return self._send_sendmail(subject, message, severity)
return self._send_with_retry(_do_send)
def _send_smtp(self, subject: str, body: str, severity: str) -> Tuple[int, str]:
import smtplib
from email.message import EmailMessage
msg = EmailMessage()
msg['Subject'] = subject
msg['From'] = self.from_address
msg['To'] = ', '.join(self.to_addresses)
msg.set_content(body)
# Add HTML alternative
html_body = self._format_html(subject, body, severity)
if html_body:
msg.add_alternative(html_body, subtype='html')
try:
if self.tls_mode == 'ssl':
server = smtplib.SMTP_SSL(self.host, self.port, timeout=self.timeout)
else:
server = smtplib.SMTP(self.host, self.port, timeout=self.timeout)
if self.tls_mode == 'starttls':
server.starttls()
if self.username and self.password:
server.login(self.username, self.password)
server.send_message(msg)
server.quit()
return 200, 'OK'
except smtplib.SMTPAuthenticationError as e:
return 0, f'SMTP authentication failed: {e}'
except smtplib.SMTPConnectError as e:
return 0, f'SMTP connection failed: {e}'
except smtplib.SMTPException as e:
return 0, f'SMTP error: {e}'
except (OSError, TimeoutError) as e:
return 0, f'Connection error: {e}'
def _send_sendmail(self, subject: str, body: str, severity: str) -> Tuple[int, str]:
import os
import subprocess
from email.message import EmailMessage
sendmail = '/usr/sbin/sendmail'
if not os.path.exists(sendmail):
return 0, 'sendmail not found at /usr/sbin/sendmail'
msg = EmailMessage()
msg['Subject'] = subject
msg['From'] = self.from_address or 'proxmenux@localhost'
msg['To'] = ', '.join(self.to_addresses)
msg.set_content(body)
try:
proc = subprocess.run(
[sendmail, '-t', '-oi'],
input=msg.as_string(), capture_output=True, text=True, timeout=30
)
if proc.returncode == 0:
return 200, 'OK'
return 0, f'sendmail failed (rc={proc.returncode}): {proc.stderr[:200]}'
except subprocess.TimeoutExpired:
return 0, 'sendmail timed out after 30s'
except Exception as e:
return 0, f'sendmail error: {e}'
@staticmethod
def _format_html(subject: str, body: str, severity: str) -> str:
"""Create professional HTML email."""
import html as html_mod
severity_colors = {'CRITICAL': '#dc2626', 'WARNING': '#f59e0b', 'INFO': '#3b82f6'}
color = severity_colors.get(severity, '#6b7280')
body_html = ''.join(
f'<p style="margin:4px 0;color:#374151;">{html_mod.escape(line)}</p>'
for line in body.split('\n') if line.strip()
)
return f'''<!DOCTYPE html>
<html><body style="font-family:-apple-system,Arial,sans-serif;background:#f3f4f6;padding:20px;">
<div style="max-width:600px;margin:0 auto;background:#fff;border-radius:8px;overflow:hidden;">
<div style="background:{color};padding:16px 24px;">
<h2 style="color:#fff;margin:0;font-size:16px;">ProxMenux Monitor</h2>
<p style="color:rgba(255,255,255,0.85);margin:4px 0 0;font-size:13px;">{html_mod.escape(severity)} Alert</p>
</div>
<div style="padding:24px;">
<h3 style="margin:0 0 12px;color:#111827;">{html_mod.escape(subject)}</h3>
{body_html}
</div>
<div style="background:#f9fafb;padding:12px 24px;border-top:1px solid #e5e7eb;">
<p style="margin:0;font-size:11px;color:#9ca3af;">Sent by ProxMenux Notification Service</p>
</div>
</div>
</body></html>'''
def test(self) -> Tuple[bool, str]:
result = self.send(
'ProxMenux Test Notification',
'This is a test notification from ProxMenux Monitor.\n'
'If you received this, your email channel is working correctly.',
'INFO'
)
return result.get('success', False), result.get('error', '')
# ─── Channel Factory ─────────────────────────────────────────────
CHANNEL_TYPES = {
'telegram': {
'name': 'Telegram',
'config_keys': ['bot_token', 'chat_id'],
'class': TelegramChannel,
},
'gotify': {
'name': 'Gotify',
'config_keys': ['url', 'token'],
'class': GotifyChannel,
},
'discord': {
'name': 'Discord',
'config_keys': ['webhook_url'],
'class': DiscordChannel,
},
'email': {
'name': 'Email (SMTP)',
'config_keys': ['host', 'port', 'username', 'password', 'tls_mode',
'from_address', 'to_addresses', 'subject_prefix'],
'class': EmailChannel,
},
}
def create_channel(channel_type: str, config: Dict[str, str]) -> Optional[NotificationChannel]:
"""Create a channel instance from type name and config dict.
Args:
channel_type: 'telegram', 'gotify', or 'discord'
config: Dict with channel-specific keys (see CHANNEL_TYPES)
Returns:
Channel instance or None if creation fails
"""
try:
if channel_type == 'telegram':
return TelegramChannel(
bot_token=config.get('bot_token', ''),
chat_id=config.get('chat_id', '')
)
elif channel_type == 'gotify':
return GotifyChannel(
server_url=config.get('url', ''),
app_token=config.get('token', '')
)
elif channel_type == 'discord':
return DiscordChannel(
webhook_url=config.get('webhook_url', '')
)
elif channel_type == 'email':
return EmailChannel(config)
except Exception as e:
print(f"[NotificationChannels] Failed to create {channel_type}: {e}")
return None

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,958 +0,0 @@
"""
ProxMenux Notification Templates
Message templates for all event types with per-channel formatting.
Templates use Python str.format() variables:
{hostname}, {severity}, {category}, {reason}, {summary},
{previous}, {current}, {vmid}, {vmname}, {timestamp}, etc.
Optional AI enhancement enriches messages with context/suggestions.
Author: MacRimi
"""
import json
import re
import socket
import time
import urllib.request
import urllib.error
from typing import Dict, Any, Optional, List
# ─── vzdump message parser ───────────────────────────────────────
def _parse_vzdump_message(message: str) -> Optional[Dict[str, Any]]:
"""Parse a PVE vzdump notification message into structured data.
Supports two formats:
1. Local storage: table with columns VMID Name Status Time Size Filename
2. PBS storage: log-style output with 'Finished Backup of VM NNN (HH:MM:SS)'
and sizes in lines like 'root.pxar: had to backup X of Y' or 'transferred X'
Returns dict with 'vms' list, 'total_time', 'total_size', or None.
"""
if not message:
return None
vms: List[Dict[str, str]] = []
total_time = ''
total_size = ''
lines = message.split('\n')
# ── Strategy 1: classic table (local/NFS/CIFS storage) ──
header_idx = -1
for i, line in enumerate(lines):
if re.match(r'\s*VMID\s+Name\s+Status', line, re.IGNORECASE):
header_idx = i
break
if header_idx >= 0:
# Use column positions from the header to slice each row.
# Header: "VMID Name Status Time Size Filename"
header = lines[header_idx]
col_starts = []
for col_name in ['VMID', 'Name', 'Status', 'Time', 'Size', 'Filename']:
idx = header.find(col_name)
if idx >= 0:
col_starts.append(idx)
if len(col_starts) == 6:
for line in lines[header_idx + 1:]:
stripped = line.strip()
if not stripped or stripped.startswith('Total') or stripped.startswith('Logs') or stripped.startswith('='):
break
# Pad line to avoid index errors
padded = line.ljust(col_starts[-1] + 50)
vmid = padded[col_starts[0]:col_starts[1]].strip()
name = padded[col_starts[1]:col_starts[2]].strip()
status = padded[col_starts[2]:col_starts[3]].strip()
time_val = padded[col_starts[3]:col_starts[4]].strip()
size = padded[col_starts[4]:col_starts[5]].strip()
filename = padded[col_starts[5]:].strip()
if vmid and vmid.isdigit():
vms.append({
'vmid': vmid,
'name': name,
'status': status,
'time': time_val,
'size': size,
'filename': filename,
})
# ── Strategy 2: log-style (PBS / Proxmox Backup Server) ──
# Parse from the full vzdump log lines.
# Look for patterns:
# "Starting Backup of VM NNN (lxc/qemu)" -> detect guest
# "CT Name: xxx" or "VM Name: xxx" -> guest name
# "Finished Backup of VM NNN (HH:MM:SS)" -> duration + status=ok
# "root.pxar: had to backup X of Y" -> size (CT)
# "transferred X in N seconds" -> size (QEMU)
# "creating ... archive 'ct/100/2026-..'" -> archive name for PBS
# "TASK ERROR:" or "ERROR:" -> status=error
if not vms:
current_vm: Optional[Dict[str, str]] = None
for line in lines:
# Remove "INFO: " prefix that PVE adds
clean = re.sub(r'^(?:INFO|WARNING|ERROR):\s*', '', line.strip())
# Start of a new VM backup
m_start = re.match(
r'Starting Backup of VM (\d+)\s+\((lxc|qemu)\)', clean)
if m_start:
if current_vm:
vms.append(current_vm)
current_vm = {
'vmid': m_start.group(1),
'name': '',
'status': 'ok',
'time': '',
'size': '',
'filename': '',
'type': m_start.group(2),
}
continue
if current_vm:
# Guest name
m_name = re.match(r'(?:CT|VM) Name:\s*(.+)', clean)
if m_name:
current_vm['name'] = m_name.group(1).strip()
continue
# PBS archive path -> extract as filename
m_archive = re.search(
r"creating .+ archive '([^']+)'", clean)
if m_archive:
current_vm['filename'] = m_archive.group(1)
continue
# Size for containers (pxar)
m_pxar = re.search(
r'root\.pxar:.*?of\s+([\d.]+\s+\S+)', clean)
if m_pxar:
current_vm['size'] = m_pxar.group(1)
continue
# Size for QEMU (transferred)
m_transfer = re.search(
r'transferred\s+([\d.]+\s+\S+)', clean)
if m_transfer:
current_vm['size'] = m_transfer.group(1)
continue
# Finished -> duration
m_finish = re.match(
r'Finished Backup of VM (\d+)\s+\(([^)]+)\)', clean)
if m_finish:
current_vm['time'] = m_finish.group(2)
current_vm['status'] = 'ok'
vms.append(current_vm)
current_vm = None
continue
# Error
if clean.startswith('ERROR:') or clean.startswith('TASK ERROR'):
if current_vm:
current_vm['status'] = 'error'
# Don't forget the last VM if it wasn't finished
if current_vm:
vms.append(current_vm)
# ── Extract totals ──
for line in lines:
m_time = re.search(r'Total running time:\s*(.+)', line)
if m_time:
total_time = m_time.group(1).strip()
m_size = re.search(r'Total size:\s*(.+)', line)
if m_size:
total_size = m_size.group(1).strip()
# For PBS: calculate total size if not explicitly stated
if not total_size and vms:
# Sum individual sizes if they share units
sizes_gib = 0.0
for vm in vms:
s = vm.get('size', '')
m = re.match(r'([\d.]+)\s+(.*)', s)
if m:
val = float(m.group(1))
unit = m.group(2).strip().upper()
if 'GIB' in unit or 'GB' in unit:
sizes_gib += val
elif 'MIB' in unit or 'MB' in unit:
sizes_gib += val / 1024
elif 'TIB' in unit or 'TB' in unit:
sizes_gib += val * 1024
if sizes_gib > 0:
if sizes_gib >= 1024:
total_size = f"{sizes_gib / 1024:.3f} TiB"
elif sizes_gib >= 1:
total_size = f"{sizes_gib:.3f} GiB"
else:
total_size = f"{sizes_gib * 1024:.3f} MiB"
# For PBS: calculate total time if not stated
if not total_time and vms:
total_secs = 0
for vm in vms:
t = vm.get('time', '')
# Parse HH:MM:SS format
m = re.match(r'(\d+):(\d+):(\d+)', t)
if m:
total_secs += int(m.group(1)) * 3600 + int(m.group(2)) * 60 + int(m.group(3))
if total_secs > 0:
hours = total_secs // 3600
mins = (total_secs % 3600) // 60
secs = total_secs % 60
if hours:
total_time = f"{hours}h {mins}m {secs}s"
elif mins:
total_time = f"{mins}m {secs}s"
else:
total_time = f"{secs}s"
if not vms and not total_size:
return None
return {
'vms': vms,
'total_time': total_time,
'total_size': total_size,
'vm_count': len(vms),
}
def _format_vzdump_body(parsed: Dict[str, Any], is_success: bool) -> str:
"""Format parsed vzdump data into a clean Telegram-friendly message."""
parts = []
for vm in parsed.get('vms', []):
status = vm.get('status', '').lower()
icon = '\u2705' if status == 'ok' else '\u274C'
parts.append(f"{icon} ID {vm['vmid']} ({vm['name']})")
details = []
if vm.get('size'):
details.append(f"Size: {vm['size']}")
if vm.get('time'):
details.append(f"Duration: {vm['time']}")
if vm.get('filename'):
fname = vm['filename']
# PBS archives look like "ct/100/2026-..." or "vm/105/2026-..."
if re.match(r'^(?:ct|vm)/\d+/', fname):
details.append(f"PBS: {fname}")
else:
details.append(f"File: {fname}")
if details:
parts.append(' | '.join(details))
parts.append('') # blank line between VMs
# Summary
vm_count = parsed.get('vm_count', 0)
if vm_count > 0 or parsed.get('total_size'):
ok_count = sum(1 for v in parsed.get('vms', [])
if v.get('status', '').lower() == 'ok')
fail_count = vm_count - ok_count
summary_parts = []
if vm_count:
summary_parts.append(f"{vm_count} backup(s)")
if fail_count:
summary_parts.append(f"{fail_count} failed")
if parsed.get('total_size'):
summary_parts.append(f"Total: {parsed['total_size']}")
if parsed.get('total_time'):
summary_parts.append(f"Time: {parsed['total_time']}")
if summary_parts:
parts.append('--- ' + ' | '.join(summary_parts))
return '\n'.join(parts)
# ─── Severity Icons ──────────────────────────────────────────────
SEVERITY_ICONS = {
'CRITICAL': '\U0001F534',
'WARNING': '\U0001F7E1',
'INFO': '\U0001F535',
'OK': '\U0001F7E2',
'UNKNOWN': '\u26AA',
}
SEVERITY_ICONS_DISCORD = {
'CRITICAL': ':red_circle:',
'WARNING': ':yellow_circle:',
'INFO': ':blue_circle:',
'OK': ':green_circle:',
'UNKNOWN': ':white_circle:',
}
# ─── Event Templates ─────────────────────────────────────────────
# Each template has a 'title' and 'body' with {variable} placeholders.
# 'group' is used for UI event filter grouping.
# 'default_enabled' controls initial state in settings.
TEMPLATES = {
# ── Health Monitor state changes ──
# NOTE: state_change is disabled by default -- it fires on every
# status oscillation (OK->WARNING->OK) which creates noise.
# The health_persistent and new_error templates cover this better.
'state_change': {
'title': '{hostname}: {category} changed to {current}',
'body': '{category} status changed from {previous} to {current}.\n{reason}',
'group': 'system',
'default_enabled': False,
},
'new_error': {
'title': '{hostname}: New {severity} - {category}',
'body': '{reason}',
'group': 'system',
'default_enabled': True,
},
'error_resolved': {
'title': '{hostname}: Resolved - {category}',
'body': '{reason}\nDuration: {duration}',
'group': 'system',
'default_enabled': True,
},
'error_escalated': {
'title': '{hostname}: Escalated to {severity} - {category}',
'body': '{reason}',
'group': 'system',
'default_enabled': True,
},
# ── VM / CT events ──
'vm_start': {
'title': '{hostname}: VM {vmid} started',
'body': '{vmname} ({vmid}) has been started.',
'group': 'vm_ct',
'default_enabled': True,
},
'vm_stop': {
'title': '{hostname}: VM {vmid} stopped',
'body': '{vmname} ({vmid}) has been stopped.',
'group': 'vm_ct',
'default_enabled': False,
},
'vm_shutdown': {
'title': '{hostname}: VM {vmid} shutdown',
'body': '{vmname} ({vmid}) has been shut down.',
'group': 'vm_ct',
'default_enabled': False,
},
'vm_fail': {
'title': '{hostname}: VM {vmid} FAILED',
'body': '{vmname} ({vmid}) has failed.\n{reason}',
'group': 'vm_ct',
'default_enabled': True,
},
'vm_restart': {
'title': '{hostname}: VM {vmid} restarted',
'body': '{vmname} ({vmid}) has been restarted.',
'group': 'vm_ct',
'default_enabled': False,
},
'ct_start': {
'title': '{hostname}: CT {vmid} started',
'body': '{vmname} ({vmid}) has been started.',
'group': 'vm_ct',
'default_enabled': True,
},
'ct_stop': {
'title': '{hostname}: CT {vmid} stopped',
'body': '{vmname} ({vmid}) has been stopped.',
'group': 'vm_ct',
'default_enabled': False,
},
'ct_shutdown': {
'title': '{hostname}: CT {vmid} shutdown',
'body': '{vmname} ({vmid}) has been shut down.',
'group': 'vm_ct',
'default_enabled': False,
},
'ct_restart': {
'title': '{hostname}: CT {vmid} restarted',
'body': '{vmname} ({vmid}) has been restarted.',
'group': 'vm_ct',
'default_enabled': False,
},
'ct_fail': {
'title': '{hostname}: CT {vmid} FAILED',
'body': '{vmname} ({vmid}) has failed.\n{reason}',
'group': 'vm_ct',
'default_enabled': True,
},
'migration_start': {
'title': '{hostname}: Migration started - {vmid}',
'body': '{vmname} ({vmid}) migration to {target_node} started.',
'group': 'vm_ct',
'default_enabled': True,
},
'migration_complete': {
'title': '{hostname}: Migration complete - {vmid}',
'body': '{vmname} ({vmid}) migrated successfully to {target_node}.',
'group': 'vm_ct',
'default_enabled': True,
},
'migration_fail': {
'title': '{hostname}: Migration FAILED - {vmid}',
'body': '{vmname} ({vmid}) migration to {target_node} failed.\n{reason}',
'group': 'vm_ct',
'default_enabled': True,
},
'replication_fail': {
'title': '{hostname}: Replication FAILED - {vmid}',
'body': 'Replication of {vmname} ({vmid}) has failed.\n{reason}',
'group': 'vm_ct',
'default_enabled': True,
},
'replication_complete': {
'title': '{hostname}: Replication complete - {vmid}',
'body': 'Replication of {vmname} ({vmid}) completed successfully.',
'group': 'vm_ct',
'default_enabled': False,
},
# ── Backup / Snapshot events ──
'backup_start': {
'title': '{hostname}: Backup started - {vmid}',
'body': 'Backup of {vmname} ({vmid}) has started.',
'group': 'backup',
'default_enabled': False,
},
'backup_complete': {
'title': '{hostname}: Backup complete - {vmid}',
'body': 'Backup of {vmname} ({vmid}) completed successfully.\nSize: {size}',
'group': 'backup',
'default_enabled': True,
},
'backup_fail': {
'title': '{hostname}: Backup FAILED - {vmid}',
'body': 'Backup of {vmname} ({vmid}) has failed.\n{reason}',
'group': 'backup',
'default_enabled': True,
},
'snapshot_complete': {
'title': '{hostname}: Snapshot created - {vmid}',
'body': 'Snapshot of {vmname} ({vmid}) created: {snapshot_name}',
'group': 'backup',
'default_enabled': False,
},
'snapshot_fail': {
'title': '{hostname}: Snapshot FAILED - {vmid}',
'body': 'Snapshot of {vmname} ({vmid}) failed.\n{reason}',
'group': 'backup',
'default_enabled': True,
},
# ── Resource events (from Health Monitor) ──
'cpu_high': {
'title': '{hostname}: High CPU usage ({value}%)',
'body': 'CPU usage is at {value}% on {cores} cores.\n{details}',
'group': 'resources',
'default_enabled': True,
},
'ram_high': {
'title': '{hostname}: High memory usage ({value}%)',
'body': 'Memory usage: {used} / {total} ({value}%).\n{details}',
'group': 'resources',
'default_enabled': True,
},
'temp_high': {
'title': '{hostname}: High temperature ({value}C)',
'body': 'CPU temperature: {value}C (threshold: {threshold}C).\n{details}',
'group': 'resources',
'default_enabled': True,
},
'disk_space_low': {
'title': '{hostname}: Low disk space on {mount}',
'body': '{mount}: {used}% used ({available} available).',
'group': 'storage',
'default_enabled': True,
},
'disk_io_error': {
'title': '{hostname}: Disk I/O error',
'body': '{reason}',
'group': 'storage',
'default_enabled': True,
},
'storage_unavailable': {
'title': '{hostname}: Storage unavailable - {storage_name}',
'body': 'PVE storage "{storage_name}" ({storage_type}) is not available.\n{reason}',
'group': 'storage',
'default_enabled': True,
},
'load_high': {
'title': '{hostname}: High system load ({value})',
'body': 'System load average: {value} on {cores} cores.\n{details}',
'group': 'resources',
'default_enabled': True,
},
# ── Network events ──
'network_down': {
'title': '{hostname}: Network connectivity lost',
'body': 'Network connectivity check failed.\n{reason}',
'group': 'network',
'default_enabled': True,
},
'network_latency': {
'title': '{hostname}: High network latency ({value}ms)',
'body': 'Latency to gateway: {value}ms (threshold: {threshold}ms).',
'group': 'network',
'default_enabled': False,
},
# ── Security events ──
'auth_fail': {
'title': '{hostname}: Authentication failure',
'body': 'Failed login attempt from {source_ip}.\nUser: {username}\nService: {service}',
'group': 'security',
'default_enabled': True,
},
'ip_block': {
'title': '{hostname}: IP blocked by Fail2Ban',
'body': 'IP {source_ip} has been banned.\nJail: {jail}\nFailures: {failures}',
'group': 'security',
'default_enabled': True,
},
'firewall_issue': {
'title': '{hostname}: Firewall issue detected',
'body': '{reason}',
'group': 'security',
'default_enabled': True,
},
'user_permission_change': {
'title': '{hostname}: User permission changed',
'body': 'User: {username}\nChange: {change_details}',
'group': 'security',
'default_enabled': True,
},
# ── Cluster events ──
'split_brain': {
'title': '{hostname}: SPLIT-BRAIN detected',
'body': 'Cluster split-brain condition detected.\nQuorum status: {quorum}',
'group': 'cluster',
'default_enabled': True,
},
'node_disconnect': {
'title': '{hostname}: Node disconnected',
'body': 'Node {node_name} has disconnected from the cluster.',
'group': 'cluster',
'default_enabled': True,
},
'node_reconnect': {
'title': '{hostname}: Node reconnected',
'body': 'Node {node_name} has reconnected to the cluster.',
'group': 'cluster',
'default_enabled': True,
},
# ── System events ──
'system_shutdown': {
'title': '{hostname}: System shutting down',
'body': 'The system is shutting down.\n{reason}',
'group': 'system',
'default_enabled': True,
},
'system_reboot': {
'title': '{hostname}: System rebooting',
'body': 'The system is rebooting.\n{reason}',
'group': 'system',
'default_enabled': True,
},
'system_problem': {
'title': '{hostname}: System problem detected',
'body': '{reason}',
'group': 'system',
'default_enabled': True,
},
'service_fail': {
'title': '{hostname}: Service failed - {service_name}',
'body': '{reason}',
'group': 'system',
'default_enabled': True,
},
'update_available': {
'title': '{hostname}: Updates available ({count})',
'body': '{count} package updates are available.\n{details}',
'group': 'system',
'default_enabled': False,
},
'update_complete': {
'title': '{hostname}: Update completed',
'body': '{details}',
'group': 'system',
'default_enabled': False,
},
# ── Unknown persistent (from health monitor) ──
'unknown_persistent': {
'title': '{hostname}: Check unavailable - {category}',
'body': 'Health check for {category} has been unavailable for 3+ cycles.\n{reason}',
'group': 'system',
'default_enabled': False,
},
# ── Persistent Health Issues (daily digest) ──
'health_persistent': {
'title': '{hostname}: {count} active health issue(s)',
'body': 'The following health issues remain active:\n{issue_list}\n\nThis digest is sent once every 24 hours while issues persist.',
'group': 'system',
'default_enabled': True,
},
'health_issue_new': {
'title': '{hostname}: New health issue - {category}',
'body': 'New {severity} issue detected:\n{reason}',
'group': 'system',
'default_enabled': True,
},
'health_issue_resolved': {
'title': '{hostname}: Resolved - {category}',
'body': '{category} issue has been resolved.\n{reason}\nDuration: {duration}',
'group': 'system',
'default_enabled': True,
},
# ── Update notifications (enriched) ──
'update_summary': {
'title': '{hostname}: {total_count} updates available',
'body': '{security_count} security update(s), {total_count} total.\n{package_list}',
'group': 'system',
'default_enabled': True,
},
'pve_update': {
'title': '{hostname}: PVE update available ({version})',
'body': 'Proxmox VE update available: {version}\n{details}',
'group': 'system',
'default_enabled': True,
},
# ── PVE webhook test ──
'webhook_test': {
'title': '{hostname}: Webhook test received',
'body': 'PVE webhook connectivity test successful.\n{reason}',
'group': 'system',
'default_enabled': True,
},
# ── Burst aggregation summaries ──
'burst_auth_fail': {
'title': '{hostname}: {count} auth failures in {window}',
'body': '{count} authentication failures detected in {window}.\nSources: {entity_list}',
'group': 'security',
'default_enabled': True,
},
'burst_ip_block': {
'title': '{hostname}: Fail2Ban banned {count} IPs in {window}',
'body': '{count} IPs banned by Fail2Ban in {window}.\nIPs: {entity_list}',
'group': 'security',
'default_enabled': True,
},
'burst_disk_io': {
'title': '{hostname}: {count} disk I/O errors on {entity_list}',
'body': '{count} I/O errors detected in {window}.\nDevices: {entity_list}',
'group': 'storage',
'default_enabled': True,
},
'burst_cluster': {
'title': '{hostname}: Cluster flapping detected ({count} changes)',
'body': 'Cluster state changed {count} times in {window}.\nNodes: {entity_list}',
'group': 'cluster',
'default_enabled': True,
},
'burst_generic': {
'title': '{hostname}: {count} {event_type} events in {window}',
'body': '{count} events of type {event_type} in {window}.\n{entity_list}',
'group': 'system',
'default_enabled': True,
},
}
# ─── Event Groups (for UI filtering) ─────────────────────────────
EVENT_GROUPS = {
'system': {'label': 'System', 'description': 'System health, services, updates'},
'vm_ct': {'label': 'VM / CT', 'description': 'Virtual machines and containers'},
'backup': {'label': 'Backup', 'description': 'Backups and snapshots'},
'resources': {'label': 'Resources', 'description': 'CPU, memory, temperature, load'},
'storage': {'label': 'Storage', 'description': 'Disk space and I/O'},
'network': {'label': 'Network', 'description': 'Connectivity and latency'},
'security': {'label': 'Security', 'description': 'Authentication, firewall, bans'},
'cluster': {'label': 'Cluster', 'description': 'Cluster health and quorum'},
}
# ─── Template Renderer ───────────────────────────────────────────
def _get_hostname() -> str:
"""Get short hostname for message titles."""
try:
return socket.gethostname().split('.')[0]
except Exception:
return 'proxmox'
def render_template(event_type: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""Render a template into a structured notification object.
Returns structured output usable by all channels:
title, body (text), body_text, body_html (escaped), fields, tags, severity, group
"""
import html as html_mod
template = TEMPLATES.get(event_type)
if not template:
fallback_body = data.get('message', data.get('reason', str(data)))
severity = data.get('severity', 'INFO')
return {
'title': f"{_get_hostname()}: {event_type}",
'body': fallback_body, 'body_text': fallback_body,
'body_html': f'<p>{html_mod.escape(str(fallback_body))}</p>',
'fields': [], 'tags': [severity, 'system', event_type],
'severity': severity, 'group': 'system',
}
# Ensure hostname is always available
variables = {
'hostname': _get_hostname(),
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
'severity': data.get('severity', 'INFO'),
# Burst event variables
'window': '', 'entity_list': '',
# Common defaults
'vmid': '', 'vmname': '', 'reason': '', 'summary': '',
'details': '', 'category': '', 'previous': '', 'current': '',
'duration': '', 'value': '', 'threshold': '',
'source_ip': '', 'username': '', 'service': '', 'service_name': '',
'node_name': '', 'target_node': '', 'mount': '', 'device': '',
'used': '', 'total': '', 'available': '', 'cores': '',
'count': '', 'size': '', 'snapshot_name': '', 'jail': '',
'failures': '', 'quorum': '', 'change_details': '', 'message': '',
'security_count': '0', 'total_count': '0', 'package_list': '',
'packages': '', 'pve_packages': '', 'version': '',
'issue_list': '', 'error_key': '',
'storage_name': '', 'storage_type': '',
}
variables.update(data)
try:
title = template['title'].format(**variables)
except (KeyError, ValueError):
title = template['title']
# ── PVE vzdump special formatting ──
# When the event came from PVE webhook with a full vzdump message,
# parse the table/logs and format a rich body instead of the sparse template.
pve_message = data.get('pve_message', '')
pve_title = data.get('pve_title', '')
if event_type in ('backup_complete', 'backup_fail') and pve_message:
parsed = _parse_vzdump_message(pve_message)
if parsed:
is_success = (event_type == 'backup_complete')
body_text = _format_vzdump_body(parsed, is_success)
# Use PVE's own title if available (contains hostname and status)
if pve_title:
title = pve_title
else:
# Couldn't parse -- use PVE raw message as body
body_text = pve_message.strip()
elif event_type == 'system_mail' and pve_message:
# System mail -- use PVE message directly (mail bounce, cron, smartd)
body_text = pve_message.strip()[:1000]
else:
try:
body_text = template['body'].format(**variables)
except (KeyError, ValueError):
body_text = template['body']
# Clean up: collapse runs of 3+ blank lines into 1, remove trailing whitespace
import re as _re
body_text = _re.sub(r'\n{3,}', '\n\n', body_text.strip())
severity = variables.get('severity', 'INFO')
group = template.get('group', 'system')
# Build structured fields for Discord embeds / rich notifications
fields = []
field_map = [
('vmid', 'VM/CT'), ('vmname', 'Name'), ('device', 'Device'),
('source_ip', 'Source IP'), ('node_name', 'Node'), ('category', 'Category'),
('service_name', 'Service'), ('jail', 'Jail'), ('username', 'User'),
('count', 'Count'), ('window', 'Window'), ('entity_list', 'Affected'),
]
for key, label in field_map:
val = variables.get(key, '')
if val:
fields.append((label, str(val)))
# Build HTML body with escaped content
body_html_parts = []
for line in body_text.split('\n'):
if line.strip():
body_html_parts.append(f'<p>{html_mod.escape(line)}</p>')
body_html = '\n'.join(body_html_parts) if body_html_parts else f'<p>{html_mod.escape(body_text)}</p>'
return {
'title': title,
'body': body_text, # backward compat
'body_text': body_text,
'body_html': body_html,
'fields': fields,
'tags': [severity, group, event_type],
'severity': severity,
'group': group,
}
def get_event_types_by_group() -> Dict[str, list]:
"""Get all event types organized by group, for UI rendering.
Returns:
{group_key: [{'type': event_type, 'title': template_title,
'default_enabled': bool}, ...]}
"""
result = {}
for event_type, template in TEMPLATES.items():
group = template.get('group', 'system')
if group not in result:
result[group] = []
import re
# Clean title: remove {hostname}: prefix and any remaining {placeholders}
title = template['title'].replace('{hostname}', '').strip(': ')
title = re.sub(r'\s*\{[^}]+\}', '', title).strip(' -:')
if not title:
title = event_type.replace('_', ' ').title()
result[group].append({
'type': event_type,
'title': title,
'default_enabled': template.get('default_enabled', True),
})
return result
def get_default_enabled_events() -> Dict[str, bool]:
"""Get the default enabled state for all event types."""
return {
event_type: template.get('default_enabled', True)
for event_type, template in TEMPLATES.items()
}
# ─── AI Enhancement (Optional) ───────────────────────────────────
class AIEnhancer:
"""Optional AI message enhancement using external LLM API.
Enriches template-generated messages with context and suggestions.
Falls back to original message if AI is unavailable or fails.
"""
SYSTEM_PROMPT = """You are a Proxmox system administrator assistant.
You receive a notification message about a server event and must enhance it with:
1. A brief explanation of what this means in practical terms
2. A suggested action if applicable (1-2 sentences max)
Keep the response concise (max 3 sentences total). Do not repeat the original message.
Respond in the same language as the input message."""
def __init__(self, provider: str, api_key: str, model: str = ''):
self.provider = provider.lower()
self.api_key = api_key
self.model = model
self._enabled = bool(api_key)
@property
def enabled(self) -> bool:
return self._enabled
def enhance(self, title: str, body: str, severity: str) -> Optional[str]:
"""Enhance a notification message with AI context.
Returns enhanced body text, or None if enhancement fails/disabled.
"""
if not self._enabled:
return None
try:
if self.provider in ('openai', 'groq'):
return self._call_openai_compatible(title, body, severity)
except Exception as e:
print(f"[AIEnhancer] Enhancement failed: {e}")
return None
def _call_openai_compatible(self, title: str, body: str, severity: str) -> Optional[str]:
"""Call OpenAI-compatible API (works with OpenAI, Groq, local)."""
if self.provider == 'groq':
url = 'https://api.groq.com/openai/v1/chat/completions'
model = self.model or 'llama-3.3-70b-versatile'
else: # openai
url = 'https://api.openai.com/v1/chat/completions'
model = self.model or 'gpt-4o-mini'
user_msg = f"Severity: {severity}\nTitle: {title}\nMessage: {body}"
payload = json.dumps({
'model': model,
'messages': [
{'role': 'system', 'content': self.SYSTEM_PROMPT},
{'role': 'user', 'content': user_msg},
],
'max_tokens': 150,
'temperature': 0.3,
}).encode('utf-8')
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.api_key}',
}
req = urllib.request.Request(url, data=payload, headers=headers)
with urllib.request.urlopen(req, timeout=10) as resp:
result = json.loads(resp.read().decode('utf-8'))
content = result['choices'][0]['message']['content'].strip()
return content if content else None
def format_with_ai(title: str, body: str, severity: str,
ai_config: Dict[str, str]) -> str:
"""Format a message with optional AI enhancement.
If AI is configured and succeeds, appends AI insight to the body.
Otherwise returns the original body unchanged.
Args:
title: Notification title
body: Notification body
severity: Severity level
ai_config: {'enabled': 'true', 'provider': 'groq', 'api_key': '...', 'model': ''}
Returns:
Enhanced body string
"""
if ai_config.get('enabled') != 'true' or not ai_config.get('api_key'):
return body
enhancer = AIEnhancer(
provider=ai_config.get('provider', 'groq'),
api_key=ai_config['api_key'],
model=ai_config.get('model', ''),
)
insight = enhancer.enhance(title, body, severity)
if insight:
return f"{body}\n\n---\n{insight}"
return body