""" ProxMenux Notification Channels Provides transport adapters for Telegram, Gotify, and Discord. Each channel implements send() and test() with: - Retry with exponential backoff (3 attempts) - Request timeout of 10s - Rate limiting (max 30 msg/min per channel) Author: MacRimi """ import json import logging import time import urllib.request import urllib.error import urllib.parse from abc import ABC, abstractmethod from collections import deque from typing import Tuple, Optional, Dict, Any, List # Server-side defense-in-depth for user-supplied URLs in channel configs. # `notification_manager.validate_external_url` rejects RFC1918 / loopback, # but Gotify is commonly self-hosted on a LAN so we relax that — and only # reject well-known SSRF targets (cloud metadata + the local PVE API). # Audit Tier 6 — sin validación SSRF en URLs de webhooks/canales. _KNOWN_SSRF_TARGETS = { '169.254.169.254', # AWS/GCE/Azure metadata 'metadata.google.internal', 'metadata.aws.internal', } _BLOCKED_LOOPBACK_PORTS = {'8006', '8007'} # PVE API HTTPS / HTTPS-alt def _validate_user_webhook_url(url: str) -> Tuple[bool, str]: """Lightweight SSRF guard for Gotify-style channels. Allows RFC1918 / loopback hosts (legit self-hosting), but rejects: - schemes other than http(s) - cloud-metadata IPs and well-known internal hostnames - loopback paired with the PVE API ports — typical pivot target """ if not isinstance(url, str) or not url: return False, "URL is required" try: parsed = urllib.parse.urlparse(url.strip()) except ValueError: return False, "URL is malformed" if parsed.scheme not in ('http', 'https'): return False, "Only http:// and https:// are accepted" host = (parsed.hostname or '').lower() if not host: return False, "URL is missing a hostname" if host in _KNOWN_SSRF_TARGETS: return False, f"Host {host} is a known cloud-metadata endpoint" port = parsed.port if (host in ('localhost', '127.0.0.1', '::1') and str(port or '') in _BLOCKED_LOOPBACK_PORTS): return False, f"Cannot point at the local PVE API ({host}:{port})" return True, "" # ─── Rate Limiter ──────────────────────────────────────────────── class RateLimiter: """Token-bucket rate limiter: max N messages per window. Thread-safe: `allow()` and `wait_time()` are called from the dispatch thread plus channel test paths concurrently. Without the lock the deque could throw IndexError on concurrent popleft / append, and the count could go inconsistent. Audit Tier 6 (Notification stack — `RateLimiter.allow()` no thread-safe). """ def __init__(self, max_calls: int = 30, window_seconds: int = 60): import threading as _threading self.max_calls = max_calls self.window = window_seconds self._timestamps: deque = deque() self._lock = _threading.Lock() # Counter of events dropped while over the rate limit. Surfaced via # `consume_drop_count()` so the dispatch loop can periodically log # "X events suppressed by rate-limit" instead of letting them # disappear silently. Audit Tier 6 — `RateLimiter` descarta # silenciosamente eventos sobre el límite. self._dropped: int = 0 def allow(self) -> bool: now = time.monotonic() with self._lock: while self._timestamps and now - self._timestamps[0] > self.window: self._timestamps.popleft() if len(self._timestamps) >= self.max_calls: self._dropped += 1 return False self._timestamps.append(now) return True def consume_drop_count(self) -> int: """Return the number of drops since the last call and reset to 0.""" with self._lock: n = self._dropped self._dropped = 0 return n def wait_time(self) -> float: with self._lock: if not self._timestamps: return 0.0 return max(0.0, self.window - (time.monotonic() - self._timestamps[0])) # ─── Base Channel ──────────────────────────────────────────────── class NotificationChannel(ABC): """Abstract base for all notification channels.""" MAX_RETRIES = 3 RETRY_DELAYS = [2, 4, 8] # exponential backoff seconds REQUEST_TIMEOUT = 10 def __init__(self): self._rate_limiter = RateLimiter(max_calls=30, window_seconds=60) @abstractmethod def send(self, title: str, message: str, severity: str = 'INFO', data: Optional[Dict] = None) -> Dict[str, Any]: """Send a notification. Returns {success, error, channel}.""" pass @abstractmethod def test(self) -> Tuple[bool, str]: """Send a test message. Returns (success, error_message).""" pass @abstractmethod def validate_config(self) -> Tuple[bool, str]: """Check if config is valid without sending. Returns (valid, error).""" pass def _http_request(self, url: str, data: bytes, headers: Dict[str, str], method: str = 'POST') -> Tuple[int, str]: """Execute HTTP request with timeout. Returns (status_code, body).""" # Ensure User-Agent is set to avoid Cloudflare 1010 errors if 'User-Agent' not in headers: headers['User-Agent'] = 'ProxMenux-Monitor/1.1' req = urllib.request.Request(url, data=data, headers=headers, method=method) try: with urllib.request.urlopen(req, timeout=self.REQUEST_TIMEOUT) as resp: body = resp.read().decode('utf-8', errors='replace') return resp.status, body except urllib.error.HTTPError as e: body = e.read().decode('utf-8', errors='replace') if e.fp else str(e) return e.code, body except urllib.error.URLError as e: return 0, str(e.reason) except Exception as e: return 0, str(e) def _send_with_retry(self, send_fn) -> Dict[str, Any]: """Wrap a send function with rate limiting and retry logic.""" if not self._rate_limiter.allow(): wait = self._rate_limiter.wait_time() # Surface the cumulative drop count every ~10 events so the # operator notices that they're losing notifications. Calling # consume_drop_count() resets the counter so the next bucket # of drops gets its own summary. try: dropped = self._rate_limiter.consume_drop_count() if dropped >= 10: print(f"[{self.__class__.__name__}] Rate-limit suppressed {dropped} events in the last window") except Exception: pass return { 'success': False, 'error': f'Rate limited. Retry in {wait:.0f}s', 'rate_limited': True } last_error = '' for attempt in range(self.MAX_RETRIES): try: status, body = send_fn() if 200 <= status < 300: return {'success': True, 'error': None} last_error = f'HTTP {status}: {body[:200]}' except Exception as e: last_error = str(e) if attempt < self.MAX_RETRIES - 1: time.sleep(self.RETRY_DELAYS[attempt]) return {'success': False, 'error': last_error} # ─── Telegram ──────────────────────────────────────────────────── class TelegramChannel(NotificationChannel): """Telegram Bot API channel using HTML parse mode.""" API_BASE = 'https://api.telegram.org/bot{token}/sendMessage' API_PHOTO = 'https://api.telegram.org/bot{token}/sendPhoto' MAX_LENGTH = 4096 SEVERITY_ICONS = { 'CRITICAL': '\U0001F534', # red circle 'WARNING': '\U0001F7E1', # yellow circle 'INFO': '\U0001F535', # blue circle 'OK': '\U0001F7E2', # green circle 'UNKNOWN': '\u26AA', # white circle } def __init__(self, bot_token: str, chat_id: str, topic_id: str = ''): super().__init__() token = bot_token.strip() # Strip 'bot' prefix if user included it (API_BASE already adds it) if token.lower().startswith('bot') and ':' in token[3:]: token = token[3:] self.bot_token = token self.chat_id = chat_id.strip() # Topic ID for supergroups with topics enabled (message_thread_id) self.topic_id = topic_id.strip() if topic_id else '' def validate_config(self) -> Tuple[bool, str]: if not self.bot_token: return False, 'Bot token is required' if not self.chat_id: return False, 'Chat ID is required' if ':' not in self.bot_token: return False, 'Invalid bot token format (expected BOT_ID:TOKEN)' return True, '' def send(self, title: str, message: str, severity: str = 'INFO', data: Optional[Dict] = None) -> Dict[str, Any]: icon = self.SEVERITY_ICONS.get(severity, self.SEVERITY_ICONS['INFO']) html_msg = f"{icon} {self._escape_html(title)}\n\n{self._escape_html(message)}" # Split long messages chunks = self._split_message(html_msg) result = {'success': True, 'error': None, 'channel': 'telegram'} for chunk in chunks: res = self._send_with_retry(lambda c=chunk: self._post_message(c)) if not res['success']: result = {**res, 'channel': 'telegram'} break return result def send_photo(self, photo_url: str, caption: str = '') -> Dict[str, Any]: """Send a photo to Telegram chat.""" url = self.API_PHOTO.format(token=self.bot_token) payload = { 'chat_id': self.chat_id, 'photo': photo_url, } # Add topic ID for supergroups with topics enabled if self.topic_id: try: payload['message_thread_id'] = int(self.topic_id) except ValueError: pass if caption: payload['caption'] = caption[:1024] # Telegram caption limit payload['parse_mode'] = 'HTML' body = json.dumps(payload).encode() headers = {'Content-Type': 'application/json'} result = self._send_with_retry( lambda: self._http_request(url, body, headers) ) result['channel'] = 'telegram' return result def test(self) -> Tuple[bool, str]: valid, err = self.validate_config() if not valid: return False, err result = self.send( 'ProxMenux Test', 'Notification service is working correctly.\nThis is a test message from ProxMenux Monitor.', 'INFO' ) return result['success'], result.get('error', '') def _post_message(self, text: str) -> Tuple[int, str]: url = self.API_BASE.format(token=self.bot_token) payload_dict = { 'chat_id': self.chat_id, 'text': text, 'parse_mode': 'HTML', 'disable_web_page_preview': True, } # Add topic ID for supergroups with topics enabled if self.topic_id: try: payload_dict['message_thread_id'] = int(self.topic_id) except ValueError: pass # Invalid topic_id, skip payload = json.dumps(payload_dict).encode('utf-8') return self._http_request(url, payload, {'Content-Type': 'application/json'}) def _split_message(self, text: str) -> list: if len(text) <= self.MAX_LENGTH: return [text] chunks = [] while text: if len(text) <= self.MAX_LENGTH: chunks.append(text) break split_at = text.rfind('\n', 0, self.MAX_LENGTH) if split_at == -1: split_at = self.MAX_LENGTH chunks.append(text[:split_at]) text = text[split_at:].lstrip('\n') return chunks @staticmethod def _escape_html(text: str) -> str: return (text .replace('&', '&') .replace('<', '<') .replace('>', '>')) # ─── Gotify ────────────────────────────────────────────────────── class GotifyChannel(NotificationChannel): """Gotify push notification channel with priority mapping.""" PRIORITY_MAP = { 'OK': 1, 'INFO': 2, 'UNKNOWN': 3, 'WARNING': 5, 'CRITICAL': 10, } def __init__(self, server_url: str, app_token: str): super().__init__() self.server_url = server_url.rstrip('/').strip() self.app_token = app_token.strip() def validate_config(self) -> Tuple[bool, str]: if not self.server_url: return False, 'Server URL is required' if not self.app_token: return False, 'Application token is required' ok, err = _validate_user_webhook_url(self.server_url) if not ok: return False, f'Invalid Gotify URL: {err}' return True, '' def send(self, title: str, message: str, severity: str = 'INFO', data: Optional[Dict] = None) -> Dict[str, Any]: priority = self.PRIORITY_MAP.get(severity, 2) result = self._send_with_retry( lambda: self._post_message(title, message, priority) ) result['channel'] = 'gotify' return result def test(self) -> Tuple[bool, str]: valid, err = self.validate_config() if not valid: return False, err result = self.send( 'ProxMenux Test', 'Notification service is working correctly.\nThis is a test message from ProxMenux Monitor.', 'INFO' ) return result['success'], result.get('error', '') def _post_message(self, title: str, message: str, priority: int) -> Tuple[int, str]: url = f"{self.server_url}/message?token={self.app_token}" payload = json.dumps({ 'title': title, 'message': message, 'priority': priority, 'extras': { 'client::display': {'contentType': 'text/markdown'} } }).encode('utf-8') return self._http_request(url, payload, {'Content-Type': 'application/json'}) # ─── Discord ───────────────────────────────────────────────────── class DiscordChannel(NotificationChannel): """Discord webhook channel with color-coded embeds.""" MAX_EMBED_DESC = 2048 SEVERITY_COLORS = { 'CRITICAL': 0xED4245, # red 'WARNING': 0xFEE75C, # yellow 'INFO': 0x5865F2, # blurple 'OK': 0x57F287, # green 'UNKNOWN': 0x99AAB5, # grey } def __init__(self, webhook_url: str): super().__init__() self.webhook_url = webhook_url.strip() _DISCORD_HOSTS = { 'discord.com', 'discordapp.com', 'ptb.discord.com', 'canary.discord.com', } def validate_config(self) -> Tuple[bool, str]: if not self.webhook_url: return False, 'Webhook URL is required' # Substring match (`'discord.com/api/webhooks/' in url`) accepted # crafted URLs like `http://attacker.example/proxy?u=https://discord.com/api/webhooks/...`. # Parse properly: require https + exact discord hostname + the # /api/webhooks// path. try: from urllib.parse import urlparse as _urlparse parsed = _urlparse(self.webhook_url) except Exception: return False, 'Invalid Discord webhook URL' if parsed.scheme != 'https': return False, 'Discord webhook must use https://' if (parsed.hostname or '').lower() not in self._DISCORD_HOSTS: return False, 'Invalid Discord webhook URL (host must be discord.com)' if not parsed.path.startswith('/api/webhooks/'): return False, 'Invalid Discord webhook URL (path must be /api/webhooks/...)' return True, '' def send(self, title: str, message: str, severity: str = 'INFO', data: Optional[Dict] = None) -> Dict[str, Any]: color = self.SEVERITY_COLORS.get(severity, 0x5865F2) desc = message[:self.MAX_EMBED_DESC] if len(message) > self.MAX_EMBED_DESC else message embed = { 'title': title, 'description': desc, 'color': color, 'footer': {'text': 'ProxMenux Monitor'}, 'timestamp': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()), } # Use structured fields from render_template if available rendered_fields = (data or {}).get('_rendered_fields', []) if rendered_fields: embed['fields'] = [ {'name': name, 'value': val[:1024], 'inline': True} for name, val in rendered_fields[:25] # Discord limit: 25 fields ] elif data: fields = [] if data.get('category'): fields.append({'name': 'Category', 'value': data['category'], 'inline': True}) if data.get('hostname'): fields.append({'name': 'Host', 'value': data['hostname'], 'inline': True}) if data.get('severity'): fields.append({'name': 'Severity', 'value': data['severity'], 'inline': True}) if fields: embed['fields'] = fields result = self._send_with_retry( lambda: self._post_webhook(embed) ) result['channel'] = 'discord' return result def test(self) -> Tuple[bool, str]: valid, err = self.validate_config() if not valid: return False, err result = self.send( 'ProxMenux Test', 'Notification service is working correctly.\nThis is a test message from ProxMenux Monitor.', 'INFO' ) return result['success'], result.get('error', '') def _post_webhook(self, embed: Dict) -> Tuple[int, str]: payload = json.dumps({ 'username': 'ProxMenux', 'embeds': [embed] }).encode('utf-8') return self._http_request( self.webhook_url, payload, {'Content-Type': 'application/json'} ) # ─── Email Channel ────────────────────────────────────────────── class EmailChannel(NotificationChannel): """Email notification channel using SMTP (smtplib) or sendmail fallback. Config keys: host, port, username, password, tls_mode (none|starttls|ssl), from_address, to_addresses (comma-separated), subject_prefix, timeout """ def __init__(self, config: Dict[str, str]): super().__init__() self.host = (config.get('host', '') or '').strip() self.port = int(config.get('port', 587) or 587) self.username = config.get('username', '') or '' self.password = config.get('password', '') or '' # `dict.get(k, default)` only returns default when the key is MISSING; # if the user previously saved an empty string or null, we'd end up # with `tls_mode=''` and silently skip STARTTLS — which causes # `SMTPNotSupportedError: SMTP AUTH extension not supported by server` # on Gmail/Outlook because they only advertise AUTH post-STARTTLS. tls_raw = (config.get('tls_mode') or 'starttls').strip().lower() if tls_raw not in ('none', 'starttls', 'ssl'): tls_raw = 'starttls' self.tls_mode = tls_raw self.from_address = config.get('from_address', '') or '' self.to_addresses = self._parse_recipients(config.get('to_addresses', '')) self.subject_prefix = config.get('subject_prefix', '[ProxMenux]') or '[ProxMenux]' self.timeout = int(config.get('timeout', 10) or 10) @staticmethod def _parse_recipients(raw) -> list: if isinstance(raw, list): return [a.strip() for a in raw if a.strip()] return [addr.strip() for addr in str(raw).split(',') if addr.strip()] def validate_config(self) -> Tuple[bool, str]: if not self.to_addresses: return False, 'No recipients configured' if not self.from_address: return False, 'No from address configured' # Credentials without an explicit SMTP host would silently fall back to # `/usr/sbin/sendmail`, which ignores username/password entirely — the # test returns OK because Postfix queued the message, but the relay is # never authenticated and the mail rots in the local mailq. Reported by # Ignacio Seijo: "dejando host/puerto en blanco el test pasa pero el # correo nunca llega". if (self.username or self.password) and not self.host: return False, ('SMTP credentials provided but no host configured. ' 'Set host (e.g. smtp.gmail.com) and port (587) — ' 'without a host the message goes to the local MTA ' 'and your username/password are ignored.') # Must have SMTP host OR local sendmail available if not self.host: import os if not os.path.exists('/usr/sbin/sendmail'): return False, 'No SMTP host configured and /usr/sbin/sendmail not found' # Reject configurations that would send credentials in cleartext over # the network. Loopback (`localhost` / `127.0.0.1`) and the local-only # sendmail path are exempt — those don't traverse a wire that an # attacker could sniff. Audit Tier 6 (Notification stack — SMTP TLS). host_lower = (self.host or '').lower() is_local = host_lower in ('', 'localhost', 'localhost.localdomain', '127.0.0.1', '::1') if (self.tls_mode == 'none' and self.username and self.password and not is_local): return False, ('SMTP TLS is disabled but credentials would travel over plain ' 'text. Use STARTTLS or SSL/TLS, or remove the username/password.') return True, '' def send(self, title: str, message: str, severity: str = 'INFO', data: Optional[Dict] = None) -> Dict[str, Any]: subject = f"{self.subject_prefix} [{severity}] {title}" def _do_send(): if self.host: return self._send_smtp(subject, message, severity, data) else: return self._send_sendmail(subject, message, severity, data) return self._send_with_retry(_do_send) def _send_smtp(self, subject: str, body: str, severity: str, data: Optional[Dict] = None) -> Tuple[int, str]: import smtplib from email.message import EmailMessage msg = EmailMessage() msg['Subject'] = subject msg['From'] = self.from_address msg['To'] = ', '.join(self.to_addresses) msg.set_content(body) # Add HTML alternative html_body = self._format_html(subject, body, severity, data) if html_body: msg.add_alternative(html_body, subtype='html') server = None try: import ssl as _ssl if self.tls_mode == 'ssl': ctx = _ssl.create_default_context() server = smtplib.SMTP_SSL(self.host, self.port, timeout=self.timeout, context=ctx) server.ehlo() else: server = smtplib.SMTP(self.host, self.port, timeout=self.timeout) server.ehlo() if self.tls_mode == 'starttls': ctx = _ssl.create_default_context() server.starttls(context=ctx) server.ehlo() # Re-identify after TLS -- server re-announces AUTH if self.username and self.password: # If the server doesn't advertise AUTH after our EHLO sequence, # smtplib's `login()` raises `SMTPNotSupportedError` with the # opaque message "SMTP AUTH extension not supported by server". # That fired for users who left tls_mode blank or pointed at # port 587 without STARTTLS — Gmail only advertises AUTH after # the TLS handshake. Surface the real reason here. if not server.has_extn('auth'): hint = ( f"server={self.host}:{self.port} tls_mode={self.tls_mode}" ) if self.tls_mode == 'none': return 0, ( 'SMTP server did not advertise AUTH after EHLO. ' 'TLS is disabled — most providers (Gmail, Outlook, ' 'Office365) only allow login after STARTTLS or SSL. ' f'Switch TLS Mode to STARTTLS (port 587) or SSL/TLS ' f'(port 465). [{hint}]' ) return 0, ( 'SMTP server did not advertise AUTH after EHLO. ' 'Verify the host/port/TLS combination. For Gmail use ' 'smtp.gmail.com:587 with STARTTLS and an App Password ' '(https://myaccount.google.com/apppasswords); for ' f'Outlook use smtp.office365.com:587 with STARTTLS. [{hint}]' ) server.login(self.username, self.password) server.send_message(msg) server.quit() server = None return 200, 'OK' except smtplib.SMTPAuthenticationError as e: return 0, f'SMTP authentication failed (check username/password or app-specific password): {e}' except smtplib.SMTPNotSupportedError as e: return 0, (f'SMTP AUTH not supported by server. ' f'TLS mode: {self.tls_mode}, port: {self.port}. ' f'Gmail/Outlook require STARTTLS on 587 or SSL/TLS on 465. ' f'For Gmail, generate an App Password at ' f'https://myaccount.google.com/apppasswords. Detail: {e}') except smtplib.SMTPConnectError as e: return 0, f'SMTP connection failed: {e}' except smtplib.SMTPException as e: return 0, f'SMTP error: {e}' except _ssl.SSLError as e: return 0, f'TLS/SSL error (check TLS mode and port): {e}' except (OSError, TimeoutError) as e: return 0, f'Connection error: {e}' finally: if server: try: server.quit() except Exception: pass def _send_sendmail(self, subject: str, body: str, severity: str, data: Optional[Dict] = None) -> Tuple[int, str]: import os import subprocess from email.message import EmailMessage sendmail = '/usr/sbin/sendmail' if not os.path.exists(sendmail): return 0, 'sendmail not found at /usr/sbin/sendmail' msg = EmailMessage() msg['Subject'] = subject msg['From'] = self.from_address or 'proxmenux@localhost' msg['To'] = ', '.join(self.to_addresses) msg.set_content(body) # Add HTML alternative html_body = self._format_html(subject, body, severity, data) if html_body: msg.add_alternative(html_body, subtype='html') try: proc = subprocess.run( [sendmail, '-t', '-oi'], input=msg.as_string(), capture_output=True, text=True, timeout=30 ) if proc.returncode == 0: return 200, 'OK' return 0, f'sendmail failed (rc={proc.returncode}): {proc.stderr[:200]}' except subprocess.TimeoutExpired: return 0, 'sendmail timed out after 30s' except Exception as e: return 0, f'sendmail error: {e}' # Severity -> accent colour + label _SEV_STYLE = { 'CRITICAL': {'color': '#dc2626', 'bg': '#fef2f2', 'border': '#fecaca', 'label': 'Critical'}, 'WARNING': {'color': '#d97706', 'bg': '#fffbeb', 'border': '#fde68a', 'label': 'Warning'}, 'INFO': {'color': '#2563eb', 'bg': '#eff6ff', 'border': '#bfdbfe', 'label': 'Information'}, 'OK': {'color': '#16a34a', 'bg': '#f0fdf4', 'border': '#bbf7d0', 'label': 'Resolved'}, } _SEV_DEFAULT = {'color': '#6b7280', 'bg': '#f9fafb', 'border': '#e5e7eb', 'label': 'Notice'} # Group -> human-readable section header for the email _GROUP_LABELS = { 'vm_ct': 'Virtual Machine / Container', 'backup': 'Backup & Snapshot', 'resources': 'System Resources', 'storage': 'Storage', 'network': 'Network', 'security': 'Security', 'cluster': 'Cluster', 'services': 'System Services', 'health': 'Health Monitor', 'updates': 'System Updates', 'other': 'System Notification', } def _format_html(self, subject: str, body: str, severity: str, data: Optional[Dict] = None) -> str: """Build a professional HTML email with structured data sections.""" import html as html_mod import time as _time data = data or {} sev = self._SEV_STYLE.get(severity, self._SEV_DEFAULT) # Determine group for section header event_type = data.get('_event_type', '') group = data.get('_group', 'other') section_label = self._GROUP_LABELS.get(group, 'System Notification') # Timestamp ts = data.get('timestamp', '') or _time.strftime('%Y-%m-%d %H:%M:%S UTC', _time.gmtime()) # ── Build structured detail rows from known data fields ── detail_rows = self._build_detail_rows(data, event_type, group, html_mod) # ── Fallback: if no structured rows, render body text lines ── if not detail_rows: for line in body.split('\n'): stripped = line.strip() if not stripped: continue # Try to split "Label: value" patterns if ':' in stripped: lbl, _, val = stripped.partition(':') if val.strip() and len(lbl) < 40: detail_rows.append((html_mod.escape(lbl.strip()), html_mod.escape(val.strip()))) continue detail_rows.append(('', html_mod.escape(stripped))) # ── Render detail rows as HTML table ── rows_html = '' for label, value in detail_rows: if label: rows_html += f''' {label} {value} ''' else: # Full-width row (no label, just description text) rows_html += f''' {value} ''' # ── Reason / details block (long text, displayed separately) ── reason = data.get('reason', '') reason_html = '' if reason and len(reason) > 80: reason_html = f'''

Details

{html_mod.escape(reason)}

''' # ── Clean subject for display (remove prefix if present) ── display_title = subject for prefix in [self.subject_prefix, '[CRITICAL]', '[WARNING]', '[INFO]', '[OK]']: display_title = display_title.replace(prefix, '').strip() return f'''

ProxMenux Monitor

{html_mod.escape(section_label)} Report

{sev['label'].upper()}

{html_mod.escape(display_title)}

Host: {html_mod.escape(data.get('hostname', ''))} {html_mod.escape(ts)}
{rows_html}
{reason_html}
ProxMenux Notification Service proxmenux.com
''' @staticmethod def _build_detail_rows(data: Dict, event_type: str, group: str, html_mod) -> list: """Build structured (label, value) rows from event data. Returns list of (label_html, value_html) tuples. An empty label means a full-width descriptive row. """ esc = html_mod.escape rows = [] def _add(label: str, value, fmt: str = ''): """Add a row if value is truthy.""" v = str(value).strip() if value else '' if not v or v == '0' and label not in ('Failures',): return if fmt == 'severity': sev_colors = { 'CRITICAL': '#dc2626', 'WARNING': '#d97706', 'INFO': '#2563eb', 'OK': '#16a34a', } c = sev_colors.get(v, '#6b7280') rows.append((esc(label), f'{esc(v)}')) elif fmt == 'code': rows.append((esc(label), f'{esc(v)}')) elif fmt == 'bold': rows.append((esc(label), f'{esc(v)}')) else: rows.append((esc(label), esc(v))) # ── Common fields present in most events ── # ── VM / CT events ── if group == 'vm_ct': _add('VM/CT ID', data.get('vmid'), 'code') _add('Name', data.get('vmname'), 'bold') _add('Action', event_type.replace('_', ' ').replace('vm ', 'VM ').replace('ct ', 'CT ').title()) _add('Target Node', data.get('target_node')) _add('Reason', data.get('reason')) # ── Backup events ── elif group == 'backup': _add('VM/CT ID', data.get('vmid'), 'code') _add('Name', data.get('vmname'), 'bold') _add('Status', 'Failed' if 'fail' in event_type else 'Completed' if 'complete' in event_type else 'Started', 'severity' if 'fail' in event_type else '') _add('Size', data.get('size')) _add('Duration', data.get('duration')) _add('Snapshot', data.get('snapshot_name'), 'code') # For backup_complete/fail with parsed body, add short reason only reason = data.get('reason', '') if reason and len(reason) <= 80: _add('Details', reason) # ── Resources ── elif group == 'resources': _add('Metric', event_type.replace('_', ' ').title()) _add('Current Value', data.get('value'), 'bold') _add('Threshold', data.get('threshold')) _add('CPU Cores', data.get('cores')) _add('Memory', f"{data.get('used', '')} / {data.get('total', '')}" if data.get('used') else '') _add('Temperature', f"{data.get('value')}C" if 'temp' in event_type else '') # ── Storage ── elif group == 'storage': if 'disk_space' in event_type: _add('Mount Point', data.get('mount'), 'code') _add('Usage', f"{data.get('used')}%", 'bold') _add('Available', data.get('available')) elif 'io_error' in event_type: _add('Device', data.get('device'), 'code') _add('Severity', data.get('severity', ''), 'severity') elif 'unavailable' in event_type: _add('Storage Name', data.get('storage_name'), 'bold') _add('Type', data.get('storage_type'), 'code') reason = data.get('reason', '') if reason and len(reason) <= 80: _add('Details', reason) # ── Network ── elif group == 'network': _add('Interface', data.get('interface'), 'code') _add('Latency', f"{data.get('value')}ms" if data.get('value') else '') _add('Threshold', f"{data.get('threshold')}ms" if data.get('threshold') else '') reason = data.get('reason', '') if reason and len(reason) <= 80: _add('Details', reason) # ── Security ── elif group == 'security': _add('Event', event_type.replace('_', ' ').title()) _add('Source IP', data.get('source_ip'), 'code') _add('Username', data.get('username'), 'code') _add('Service', data.get('service')) _add('Jail', data.get('jail'), 'code') _add('Failures', data.get('failures')) _add('Change', data.get('change_details')) # ── Cluster ── elif group == 'cluster': _add('Event', event_type.replace('_', ' ').title()) _add('Node', data.get('node_name'), 'bold') _add('Quorum', data.get('quorum')) _add('Nodes Affected', data.get('entity_list')) # ── Services ── elif group == 'services': _add('Service', data.get('service_name'), 'code') _add('Process', data.get('process'), 'code') _add('Event', event_type.replace('_', ' ').title()) reason = data.get('reason', '') if reason and len(reason) <= 80: _add('Details', reason) # ── Health monitor ── elif group == 'health': _add('Category', data.get('category'), 'bold') _add('Severity', data.get('severity', ''), 'severity') if data.get('original_severity'): _add('Previous Severity', data.get('original_severity'), 'severity') _add('Duration', data.get('duration')) _add('Active Issues', data.get('count')) reason = data.get('reason', '') if reason and len(reason) <= 80: _add('Details', reason) # ── Updates ── elif group == 'updates': _add('Total Updates', data.get('total_count'), 'bold') _add('Security Updates', data.get('security_count')) _add('Proxmox Updates', data.get('pve_count')) _add('Kernel Updates', data.get('kernel_count')) imp = data.get('important_list', '') if imp and imp != 'none': # Render each package on its own line inside a single cell pkg_lines = [l.strip() for l in imp.split('\n') if l.strip()] if pkg_lines: pkg_html = '
'.join( f'{esc(p)}' for p in pkg_lines ) rows.append((esc('Important Packages'), pkg_html)) _add('Current Version', data.get('current_version'), 'code') _add('New Version', data.get('new_version'), 'code') # ── Other / unknown ── else: reason = data.get('reason', '') if reason and len(reason) <= 80: _add('Details', reason) return rows def test(self) -> Tuple[bool, str]: # Lazy import to avoid a circular dependency with notification_manager, # which already imports from this module at load time. from notification_manager import _resolve_display_hostname hostname = _resolve_display_hostname() result = self.send( 'ProxMenux Test Notification', 'This is a test notification from ProxMenux Monitor.\n' 'If you received this, your email channel is working correctly.', 'INFO', data={ 'hostname': hostname, '_event_type': 'webhook_test', '_group': 'other', 'reason': 'Email notification channel connectivity verified successfully. ' 'You will receive alerts from ProxMenux Monitor at this address.', } ) return result.get('success', False), result.get('error', '') # ─── Apprise ───────────────────────────────────────────────────── class _AppriseLogCapture(logging.Handler): """Buffers records emitted by the `apprise` logger during a single notify() call so the surrounding channel can surface the real failure reason — e.g. "error=400" plus the destination's response body — instead of the opaque "transport failure" string apprise.notify() leaves behind on a False return. Captures everything at DEBUG so the response body (which apprise's custom_json plugin logs only at DEBUG) is available; `summary()` keeps the output bounded for UI display.""" def __init__(self) -> None: super().__init__() self.records: List[logging.LogRecord] = [] def emit(self, record: logging.LogRecord) -> None: try: self.records.append(record) except Exception: pass def summary(self) -> str: """Concise digest of the captured records — WARNING+ messages first (the failure reason), then a single "Response Details" DEBUG line if present (the destination's reply body, useful for decoding 400s like `{"error": "field X missing"}`). Capped per line so a noisy plugin can't blow past the 200-char truncation `_send_with_retry` applies on the way out.""" warn_msgs: List[str] = [] response_body: str = '' for r in self.records: try: msg = r.getMessage() except Exception: continue if not msg: continue if r.levelno >= logging.WARNING: if msg not in warn_msgs: warn_msgs.append(msg[:160]) elif 'Response Details' in msg and not response_body: # Plugin logs the body as `Response Details:\r\n%r` — the # %r already wraps the bytes in repr(b'…'), strip it for # readability. body = msg.split('Response Details:', 1)[1].strip() if body.startswith(("b'", 'b"')): body = body[2:] if body.endswith(("'", '"')): body = body[:-1] body = body.replace('\\r\\n', ' ').replace('\\n', ' ').strip() if body: response_body = body[:300] parts: List[str] = [] if warn_msgs: parts.extend(warn_msgs) if response_body: parts.append(f'response: {response_body}') return ' | '.join(parts) class AppriseChannel(NotificationChannel): """Apprise meta-channel — a single URL talks to ~80 services. Apprise (https://github.com/caronc/apprise) is a Python library that normalises a wide catalogue of notification destinations behind a single URL scheme: `tgram://`, `discord://`, `slack://`, `gotify://`, `ntfy://`, `matrix://`, `mailto://`, `pushover://`, `signal://`, etc. The operator pastes one URL and ProxMenux delegates the transport. Requested in issue #207 by @0berkampf. Implemented as a *separate channel type* (not a replacement for the native Telegram / Gotify / Discord / Email channels), so installs that already have a working native channel don't need to migrate — Apprise is opt-in for users who want to reach a service we don't support natively. The library is loaded lazily on first send. Older deployments that haven't installed it yet surface a clean validation error instead of crashing the notification manager at import time. """ def __init__(self, url: str): super().__init__() self.url = (url or '').strip() # Lazy import so installs that haven't picked up the new dep yet # don't crash on module load. Each call re-imports cheaply — Python # caches the module reference after the first hit. def _load_apprise(self): try: import apprise # type: ignore return apprise except ImportError: return None def validate_config(self) -> Tuple[bool, str]: if not self.url: return False, 'Apprise URL is required' apprise = self._load_apprise() if apprise is None: return False, ( 'apprise library not installed in this deployment. ' 'Reinstall ProxMenux Monitor or run `pip install apprise` ' 'inside the AppImage environment.' ) # `add(url)` returns True only if Apprise recognised the scheme # — useful as a syntactic validation without sending anything. try: apobj = apprise.Apprise() ok = apobj.add(self.url) if not ok: return False, 'Apprise rejected the URL (unrecognised scheme or bad format)' except Exception as e: return False, f'Apprise rejected the URL: {e}' return True, '' def _severity_to_notify_type(self, apprise_mod, severity: str): """Map ProxMenux severities to Apprise NotifyType constants so services that render severity (e.g. Pushover priority, ntfy priority headers) get the right indicator.""" sev = (severity or '').upper() if sev == 'CRITICAL': return apprise_mod.NotifyType.FAILURE if sev == 'WARNING': return apprise_mod.NotifyType.WARNING if sev == 'SUCCESS': return apprise_mod.NotifyType.SUCCESS return apprise_mod.NotifyType.INFO def send(self, title: str, message: str, severity: str = 'INFO', data: Optional[Dict] = None) -> Dict[str, Any]: ok, err = self.validate_config() if not ok: return {'success': False, 'error': err, 'channel': 'apprise'} # Rate limit (shared with the other channels) before dispatch. def _send_via_apprise() -> Tuple[int, str]: apprise = self._load_apprise() if apprise is None: # Shouldn't happen — validate_config caught it above — # but defend in depth so the retry loop reports cleanly. return 0, 'apprise library not available' # Capture Apprise's internal logger during notify(). When the # plugin (jsons://, ntfy://, slack://, ...) gets a non-2xx # from the destination it logs at WARNING with the HTTP # status code — e.g. "Failed to send JSON POST notification: # error=400.". Without this capture, `notify()` just returns # False and we'd surface a useless "transport failure" with # no clue why. Reported by a beta user on 2026-05-30: jsons:// # → HTTP 400 from their webhook, no way to see the 400 in # the Monitor UI. apprise_logger = logging.getLogger('apprise') handler = _AppriseLogCapture() handler.setLevel(logging.DEBUG) prev_level = apprise_logger.level apprise_logger.addHandler(handler) # Drop the logger to DEBUG only while notify() runs so we # also capture the destination's response body (apprise # plugins emit that line at DEBUG). _AppriseLogCapture.summary # caps the included output, so this doesn't flood the UI. apprise_logger.setLevel(logging.DEBUG) try: apobj = apprise.Apprise() apobj.add(self.url) sent = apobj.notify( body=message or '', title=title or '', notify_type=self._severity_to_notify_type(apprise, severity), ) except Exception as e: apprise_logger.removeHandler(handler) apprise_logger.setLevel(prev_level) return 0, str(e) apprise_logger.removeHandler(handler) apprise_logger.setLevel(prev_level) if sent: return 200, '' # `notify` returns False iff every URL endpoint rejected. # Surface the warnings the apprise plugin emitted so the # operator can see the actual HTTP status / reason. detail = handler.summary() if not detail: detail = 'destination rejected the notification (no detail from apprise)' return 500, detail result = self._send_with_retry(_send_via_apprise) result['channel'] = 'apprise' return result def test(self) -> Tuple[bool, str]: result = self.send( title='ProxMenux Monitor — Test', message='Apprise channel is configured correctly. If you can read this, the URL is valid and the service accepted the notification.', severity='INFO', ) return bool(result.get('success')), result.get('error') or '' # ─── Channel Factory ───────────────────────────────────────────── CHANNEL_TYPES = { 'telegram': { 'name': 'Telegram', 'config_keys': ['bot_token', 'chat_id', 'topic_id'], 'class': TelegramChannel, }, 'gotify': { 'name': 'Gotify', 'config_keys': ['url', 'token'], 'class': GotifyChannel, }, 'discord': { 'name': 'Discord', 'config_keys': ['webhook_url'], 'class': DiscordChannel, }, 'email': { 'name': 'Email (SMTP)', 'config_keys': ['host', 'port', 'username', 'password', 'tls_mode', 'from_address', 'to_addresses', 'subject_prefix'], 'class': EmailChannel, }, 'apprise': { 'name': 'Apprise', 'config_keys': ['url'], 'class': AppriseChannel, }, } def create_channel(channel_type: str, config: Dict[str, str]) -> Optional[NotificationChannel]: """Create a channel instance from type name and config dict. Args: channel_type: 'telegram', 'gotify', 'discord', 'email', or 'apprise' config: Dict with channel-specific keys (see CHANNEL_TYPES) Returns: Channel instance or None if creation fails """ try: if channel_type == 'telegram': return TelegramChannel( bot_token=config.get('bot_token', ''), chat_id=config.get('chat_id', ''), topic_id=config.get('topic_id', '') ) elif channel_type == 'gotify': return GotifyChannel( server_url=config.get('url', ''), app_token=config.get('token', '') ) elif channel_type == 'discord': return DiscordChannel( webhook_url=config.get('webhook_url', '') ) elif channel_type == 'email': return EmailChannel(config) elif channel_type == 'apprise': return AppriseChannel(url=config.get('url', '')) except Exception as e: print(f"[NotificationChannels] Failed to create {channel_type}: {e}") return None