""" ProxMenux Notification Channels Provides transport adapters for Telegram, Gotify, and Discord. Each channel implements send() and test() with: - Retry with exponential backoff (3 attempts) - Request timeout of 10s - Rate limiting (max 30 msg/min per channel) Author: MacRimi """ import json import time import urllib.request import urllib.error import urllib.parse from abc import ABC, abstractmethod from collections import deque from typing import Tuple, Optional, Dict, Any # ─── Rate Limiter ──────────────────────────────────────────────── class RateLimiter: """Token-bucket rate limiter: max N messages per window.""" def __init__(self, max_calls: int = 30, window_seconds: int = 60): self.max_calls = max_calls self.window = window_seconds self._timestamps: deque = deque() def allow(self) -> bool: now = time.monotonic() while self._timestamps and now - self._timestamps[0] > self.window: self._timestamps.popleft() if len(self._timestamps) >= self.max_calls: return False self._timestamps.append(now) return True def wait_time(self) -> float: if not self._timestamps: return 0.0 return max(0.0, self.window - (time.monotonic() - self._timestamps[0])) # ─── Base Channel ──────────────────────────────────────────────── class NotificationChannel(ABC): """Abstract base for all notification channels.""" MAX_RETRIES = 3 RETRY_DELAYS = [2, 4, 8] # exponential backoff seconds REQUEST_TIMEOUT = 10 def __init__(self): self._rate_limiter = RateLimiter(max_calls=30, window_seconds=60) @abstractmethod def send(self, title: str, message: str, severity: str = 'INFO', data: Optional[Dict] = None) -> Dict[str, Any]: """Send a notification. Returns {success, error, channel}.""" pass @abstractmethod def test(self) -> Tuple[bool, str]: """Send a test message. Returns (success, error_message).""" pass @abstractmethod def validate_config(self) -> Tuple[bool, str]: """Check if config is valid without sending. Returns (valid, error).""" pass def _http_request(self, url: str, data: bytes, headers: Dict[str, str], method: str = 'POST') -> Tuple[int, str]: """Execute HTTP request with timeout. Returns (status_code, body).""" # Ensure User-Agent is set to avoid Cloudflare 1010 errors if 'User-Agent' not in headers: headers['User-Agent'] = 'ProxMenux-Monitor/1.1' req = urllib.request.Request(url, data=data, headers=headers, method=method) try: with urllib.request.urlopen(req, timeout=self.REQUEST_TIMEOUT) as resp: body = resp.read().decode('utf-8', errors='replace') return resp.status, body except urllib.error.HTTPError as e: body = e.read().decode('utf-8', errors='replace') if e.fp else str(e) return e.code, body except urllib.error.URLError as e: return 0, str(e.reason) except Exception as e: return 0, str(e) def _send_with_retry(self, send_fn) -> Dict[str, Any]: """Wrap a send function with rate limiting and retry logic.""" if not self._rate_limiter.allow(): wait = self._rate_limiter.wait_time() return { 'success': False, 'error': f'Rate limited. Retry in {wait:.0f}s', 'rate_limited': True } last_error = '' for attempt in range(self.MAX_RETRIES): try: status, body = send_fn() if 200 <= status < 300: return {'success': True, 'error': None} last_error = f'HTTP {status}: {body[:200]}' except Exception as e: last_error = str(e) if attempt < self.MAX_RETRIES - 1: time.sleep(self.RETRY_DELAYS[attempt]) return {'success': False, 'error': last_error} # ─── Telegram ──────────────────────────────────────────────────── class TelegramChannel(NotificationChannel): """Telegram Bot API channel using HTML parse mode.""" API_BASE = 'https://api.telegram.org/bot{token}/sendMessage' API_PHOTO = 'https://api.telegram.org/bot{token}/sendPhoto' MAX_LENGTH = 4096 SEVERITY_ICONS = { 'CRITICAL': '\U0001F534', # red circle 'WARNING': '\U0001F7E1', # yellow circle 'INFO': '\U0001F535', # blue circle 'OK': '\U0001F7E2', # green circle 'UNKNOWN': '\u26AA', # white circle } def __init__(self, bot_token: str, chat_id: str, topic_id: str = ''): super().__init__() token = bot_token.strip() # Strip 'bot' prefix if user included it (API_BASE already adds it) if token.lower().startswith('bot') and ':' in token[3:]: token = token[3:] self.bot_token = token self.chat_id = chat_id.strip() # Topic ID for supergroups with topics enabled (message_thread_id) self.topic_id = topic_id.strip() if topic_id else '' def validate_config(self) -> Tuple[bool, str]: if not self.bot_token: return False, 'Bot token is required' if not self.chat_id: return False, 'Chat ID is required' if ':' not in self.bot_token: return False, 'Invalid bot token format (expected BOT_ID:TOKEN)' return True, '' def send(self, title: str, message: str, severity: str = 'INFO', data: Optional[Dict] = None) -> Dict[str, Any]: icon = self.SEVERITY_ICONS.get(severity, self.SEVERITY_ICONS['INFO']) html_msg = f"{icon} {self._escape_html(title)}\n\n{self._escape_html(message)}" # Split long messages chunks = self._split_message(html_msg) result = {'success': True, 'error': None, 'channel': 'telegram'} for chunk in chunks: res = self._send_with_retry(lambda c=chunk: self._post_message(c)) if not res['success']: result = {**res, 'channel': 'telegram'} break return result def send_photo(self, photo_url: str, caption: str = '') -> Dict[str, Any]: """Send a photo to Telegram chat.""" url = self.API_PHOTO.format(token=self.bot_token) payload = { 'chat_id': self.chat_id, 'photo': photo_url, } # Add topic ID for supergroups with topics enabled if self.topic_id: try: payload['message_thread_id'] = int(self.topic_id) except ValueError: pass if caption: payload['caption'] = caption[:1024] # Telegram caption limit payload['parse_mode'] = 'HTML' body = json.dumps(payload).encode() headers = {'Content-Type': 'application/json'} result = self._send_with_retry( lambda: self._http_request(url, body, headers) ) result['channel'] = 'telegram' return result def test(self) -> Tuple[bool, str]: valid, err = self.validate_config() if not valid: return False, err result = self.send( 'ProxMenux Test', 'Notification service is working correctly.\nThis is a test message from ProxMenux Monitor.', 'INFO' ) return result['success'], result.get('error', '') def _post_message(self, text: str) -> Tuple[int, str]: url = self.API_BASE.format(token=self.bot_token) payload_dict = { 'chat_id': self.chat_id, 'text': text, 'parse_mode': 'HTML', 'disable_web_page_preview': True, } # Add topic ID for supergroups with topics enabled if self.topic_id: try: payload_dict['message_thread_id'] = int(self.topic_id) except ValueError: pass # Invalid topic_id, skip payload = json.dumps(payload_dict).encode('utf-8') return self._http_request(url, payload, {'Content-Type': 'application/json'}) def _split_message(self, text: str) -> list: if len(text) <= self.MAX_LENGTH: return [text] chunks = [] while text: if len(text) <= self.MAX_LENGTH: chunks.append(text) break split_at = text.rfind('\n', 0, self.MAX_LENGTH) if split_at == -1: split_at = self.MAX_LENGTH chunks.append(text[:split_at]) text = text[split_at:].lstrip('\n') return chunks @staticmethod def _escape_html(text: str) -> str: return (text .replace('&', '&') .replace('<', '<') .replace('>', '>')) # ─── Gotify ────────────────────────────────────────────────────── class GotifyChannel(NotificationChannel): """Gotify push notification channel with priority mapping.""" PRIORITY_MAP = { 'OK': 1, 'INFO': 2, 'UNKNOWN': 3, 'WARNING': 5, 'CRITICAL': 10, } def __init__(self, server_url: str, app_token: str): super().__init__() self.server_url = server_url.rstrip('/').strip() self.app_token = app_token.strip() def validate_config(self) -> Tuple[bool, str]: if not self.server_url: return False, 'Server URL is required' if not self.app_token: return False, 'Application token is required' if not self.server_url.startswith(('http://', 'https://')): return False, 'Server URL must start with http:// or https://' return True, '' def send(self, title: str, message: str, severity: str = 'INFO', data: Optional[Dict] = None) -> Dict[str, Any]: priority = self.PRIORITY_MAP.get(severity, 2) result = self._send_with_retry( lambda: self._post_message(title, message, priority) ) result['channel'] = 'gotify' return result def test(self) -> Tuple[bool, str]: valid, err = self.validate_config() if not valid: return False, err result = self.send( 'ProxMenux Test', 'Notification service is working correctly.\nThis is a test message from ProxMenux Monitor.', 'INFO' ) return result['success'], result.get('error', '') def _post_message(self, title: str, message: str, priority: int) -> Tuple[int, str]: url = f"{self.server_url}/message?token={self.app_token}" payload = json.dumps({ 'title': title, 'message': message, 'priority': priority, 'extras': { 'client::display': {'contentType': 'text/markdown'} } }).encode('utf-8') return self._http_request(url, payload, {'Content-Type': 'application/json'}) # ─── Discord ───────────────────────────────────────────────────── class DiscordChannel(NotificationChannel): """Discord webhook channel with color-coded embeds.""" MAX_EMBED_DESC = 2048 SEVERITY_COLORS = { 'CRITICAL': 0xED4245, # red 'WARNING': 0xFEE75C, # yellow 'INFO': 0x5865F2, # blurple 'OK': 0x57F287, # green 'UNKNOWN': 0x99AAB5, # grey } def __init__(self, webhook_url: str): super().__init__() self.webhook_url = webhook_url.strip() def validate_config(self) -> Tuple[bool, str]: if not self.webhook_url: return False, 'Webhook URL is required' if 'discord.com/api/webhooks/' not in self.webhook_url: return False, 'Invalid Discord webhook URL' return True, '' def send(self, title: str, message: str, severity: str = 'INFO', data: Optional[Dict] = None) -> Dict[str, Any]: color = self.SEVERITY_COLORS.get(severity, 0x5865F2) desc = message[:self.MAX_EMBED_DESC] if len(message) > self.MAX_EMBED_DESC else message embed = { 'title': title, 'description': desc, 'color': color, 'footer': {'text': 'ProxMenux Monitor'}, 'timestamp': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()), } # Use structured fields from render_template if available rendered_fields = (data or {}).get('_rendered_fields', []) if rendered_fields: embed['fields'] = [ {'name': name, 'value': val[:1024], 'inline': True} for name, val in rendered_fields[:25] # Discord limit: 25 fields ] elif data: fields = [] if data.get('category'): fields.append({'name': 'Category', 'value': data['category'], 'inline': True}) if data.get('hostname'): fields.append({'name': 'Host', 'value': data['hostname'], 'inline': True}) if data.get('severity'): fields.append({'name': 'Severity', 'value': data['severity'], 'inline': True}) if fields: embed['fields'] = fields result = self._send_with_retry( lambda: self._post_webhook(embed) ) result['channel'] = 'discord' return result def test(self) -> Tuple[bool, str]: valid, err = self.validate_config() if not valid: return False, err result = self.send( 'ProxMenux Test', 'Notification service is working correctly.\nThis is a test message from ProxMenux Monitor.', 'INFO' ) return result['success'], result.get('error', '') def _post_webhook(self, embed: Dict) -> Tuple[int, str]: payload = json.dumps({ 'username': 'ProxMenux', 'embeds': [embed] }).encode('utf-8') return self._http_request( self.webhook_url, payload, {'Content-Type': 'application/json'} ) # ─── Email Channel ────────────────────────────────────────────── class EmailChannel(NotificationChannel): """Email notification channel using SMTP (smtplib) or sendmail fallback. Config keys: host, port, username, password, tls_mode (none|starttls|ssl), from_address, to_addresses (comma-separated), subject_prefix, timeout """ def __init__(self, config: Dict[str, str]): super().__init__() self.host = config.get('host', '') self.port = int(config.get('port', 587) or 587) self.username = config.get('username', '') self.password = config.get('password', '') self.tls_mode = config.get('tls_mode', 'starttls') # none | starttls | ssl self.from_address = config.get('from_address', '') self.to_addresses = self._parse_recipients(config.get('to_addresses', '')) self.subject_prefix = config.get('subject_prefix', '[ProxMenux]') self.timeout = int(config.get('timeout', 10) or 10) @staticmethod def _parse_recipients(raw) -> list: if isinstance(raw, list): return [a.strip() for a in raw if a.strip()] return [addr.strip() for addr in str(raw).split(',') if addr.strip()] def validate_config(self) -> Tuple[bool, str]: if not self.to_addresses: return False, 'No recipients configured' if not self.from_address: return False, 'No from address configured' # Must have SMTP host OR local sendmail available if not self.host: import os if not os.path.exists('/usr/sbin/sendmail'): return False, 'No SMTP host configured and /usr/sbin/sendmail not found' return True, '' def send(self, title: str, message: str, severity: str = 'INFO', data: Optional[Dict] = None) -> Dict[str, Any]: subject = f"{self.subject_prefix} [{severity}] {title}" def _do_send(): if self.host: return self._send_smtp(subject, message, severity, data) else: return self._send_sendmail(subject, message, severity, data) return self._send_with_retry(_do_send) def _send_smtp(self, subject: str, body: str, severity: str, data: Optional[Dict] = None) -> Tuple[int, str]: import smtplib from email.message import EmailMessage msg = EmailMessage() msg['Subject'] = subject msg['From'] = self.from_address msg['To'] = ', '.join(self.to_addresses) msg.set_content(body) # Add HTML alternative html_body = self._format_html(subject, body, severity, data) if html_body: msg.add_alternative(html_body, subtype='html') server = None try: import ssl as _ssl if self.tls_mode == 'ssl': ctx = _ssl.create_default_context() server = smtplib.SMTP_SSL(self.host, self.port, timeout=self.timeout, context=ctx) server.ehlo() else: server = smtplib.SMTP(self.host, self.port, timeout=self.timeout) server.ehlo() if self.tls_mode == 'starttls': ctx = _ssl.create_default_context() server.starttls(context=ctx) server.ehlo() # Re-identify after TLS -- server re-announces AUTH if self.username and self.password: server.login(self.username, self.password) server.send_message(msg) server.quit() server = None return 200, 'OK' except smtplib.SMTPAuthenticationError as e: return 0, f'SMTP authentication failed (check username/password or app-specific password): {e}' except smtplib.SMTPNotSupportedError as e: return 0, (f'SMTP AUTH not supported by server. ' f'This may mean the server requires OAuth2 or an App Password ' f'instead of regular credentials: {e}') except smtplib.SMTPConnectError as e: return 0, f'SMTP connection failed: {e}' except smtplib.SMTPException as e: return 0, f'SMTP error: {e}' except _ssl.SSLError as e: return 0, f'TLS/SSL error (check TLS mode and port): {e}' except (OSError, TimeoutError) as e: return 0, f'Connection error: {e}' finally: if server: try: server.quit() except Exception: pass def _send_sendmail(self, subject: str, body: str, severity: str, data: Optional[Dict] = None) -> Tuple[int, str]: import os import subprocess from email.message import EmailMessage sendmail = '/usr/sbin/sendmail' if not os.path.exists(sendmail): return 0, 'sendmail not found at /usr/sbin/sendmail' msg = EmailMessage() msg['Subject'] = subject msg['From'] = self.from_address or 'proxmenux@localhost' msg['To'] = ', '.join(self.to_addresses) msg.set_content(body) # Add HTML alternative html_body = self._format_html(subject, body, severity, data) if html_body: msg.add_alternative(html_body, subtype='html') try: proc = subprocess.run( [sendmail, '-t', '-oi'], input=msg.as_string(), capture_output=True, text=True, timeout=30 ) if proc.returncode == 0: return 200, 'OK' return 0, f'sendmail failed (rc={proc.returncode}): {proc.stderr[:200]}' except subprocess.TimeoutExpired: return 0, 'sendmail timed out after 30s' except Exception as e: return 0, f'sendmail error: {e}' # Severity -> accent colour + label _SEV_STYLE = { 'CRITICAL': {'color': '#dc2626', 'bg': '#fef2f2', 'border': '#fecaca', 'label': 'Critical'}, 'WARNING': {'color': '#d97706', 'bg': '#fffbeb', 'border': '#fde68a', 'label': 'Warning'}, 'INFO': {'color': '#2563eb', 'bg': '#eff6ff', 'border': '#bfdbfe', 'label': 'Information'}, 'OK': {'color': '#16a34a', 'bg': '#f0fdf4', 'border': '#bbf7d0', 'label': 'Resolved'}, } _SEV_DEFAULT = {'color': '#6b7280', 'bg': '#f9fafb', 'border': '#e5e7eb', 'label': 'Notice'} # Group -> human-readable section header for the email _GROUP_LABELS = { 'vm_ct': 'Virtual Machine / Container', 'backup': 'Backup & Snapshot', 'resources': 'System Resources', 'storage': 'Storage', 'network': 'Network', 'security': 'Security', 'cluster': 'Cluster', 'services': 'System Services', 'health': 'Health Monitor', 'updates': 'System Updates', 'other': 'System Notification', } def _format_html(self, subject: str, body: str, severity: str, data: Optional[Dict] = None) -> str: """Build a professional HTML email with structured data sections.""" import html as html_mod import time as _time data = data or {} sev = self._SEV_STYLE.get(severity, self._SEV_DEFAULT) # Determine group for section header event_type = data.get('_event_type', '') group = data.get('_group', 'other') section_label = self._GROUP_LABELS.get(group, 'System Notification') # Timestamp ts = data.get('timestamp', '') or _time.strftime('%Y-%m-%d %H:%M:%S UTC', _time.gmtime()) # ── Build structured detail rows from known data fields ── detail_rows = self._build_detail_rows(data, event_type, group, html_mod) # ── Fallback: if no structured rows, render body text lines ── if not detail_rows: for line in body.split('\n'): stripped = line.strip() if not stripped: continue # Try to split "Label: value" patterns if ':' in stripped: lbl, _, val = stripped.partition(':') if val.strip() and len(lbl) < 40: detail_rows.append((html_mod.escape(lbl.strip()), html_mod.escape(val.strip()))) continue detail_rows.append(('', html_mod.escape(stripped))) # ── Render detail rows as HTML table ── rows_html = '' for label, value in detail_rows: if label: rows_html += f''' {label} {value} ''' else: # Full-width row (no label, just description text) rows_html += f''' {value} ''' # ── Reason / details block (long text, displayed separately) ── reason = data.get('reason', '') reason_html = '' if reason and len(reason) > 80: reason_html = f'''

Details

{html_mod.escape(reason)}

''' # ── Clean subject for display (remove prefix if present) ── display_title = subject for prefix in [self.subject_prefix, '[CRITICAL]', '[WARNING]', '[INFO]', '[OK]']: display_title = display_title.replace(prefix, '').strip() return f'''

ProxMenux Monitor

{html_mod.escape(section_label)} Report

{sev['label'].upper()}

{html_mod.escape(display_title)}

Host: {html_mod.escape(data.get('hostname', ''))} {html_mod.escape(ts)}
{rows_html}
{reason_html}
ProxMenux Notification Service proxmenux.com
''' @staticmethod def _build_detail_rows(data: Dict, event_type: str, group: str, html_mod) -> list: """Build structured (label, value) rows from event data. Returns list of (label_html, value_html) tuples. An empty label means a full-width descriptive row. """ esc = html_mod.escape rows = [] def _add(label: str, value, fmt: str = ''): """Add a row if value is truthy.""" v = str(value).strip() if value else '' if not v or v == '0' and label not in ('Failures',): return if fmt == 'severity': sev_colors = { 'CRITICAL': '#dc2626', 'WARNING': '#d97706', 'INFO': '#2563eb', 'OK': '#16a34a', } c = sev_colors.get(v, '#6b7280') rows.append((esc(label), f'{esc(v)}')) elif fmt == 'code': rows.append((esc(label), f'{esc(v)}')) elif fmt == 'bold': rows.append((esc(label), f'{esc(v)}')) else: rows.append((esc(label), esc(v))) # ── Common fields present in most events ── # ── VM / CT events ── if group == 'vm_ct': _add('VM/CT ID', data.get('vmid'), 'code') _add('Name', data.get('vmname'), 'bold') _add('Action', event_type.replace('_', ' ').replace('vm ', 'VM ').replace('ct ', 'CT ').title()) _add('Target Node', data.get('target_node')) _add('Reason', data.get('reason')) # ── Backup events ── elif group == 'backup': _add('VM/CT ID', data.get('vmid'), 'code') _add('Name', data.get('vmname'), 'bold') _add('Status', 'Failed' if 'fail' in event_type else 'Completed' if 'complete' in event_type else 'Started', 'severity' if 'fail' in event_type else '') _add('Size', data.get('size')) _add('Duration', data.get('duration')) _add('Snapshot', data.get('snapshot_name'), 'code') # For backup_complete/fail with parsed body, add short reason only reason = data.get('reason', '') if reason and len(reason) <= 80: _add('Details', reason) # ── Resources ── elif group == 'resources': _add('Metric', event_type.replace('_', ' ').title()) _add('Current Value', data.get('value'), 'bold') _add('Threshold', data.get('threshold')) _add('CPU Cores', data.get('cores')) _add('Memory', f"{data.get('used', '')} / {data.get('total', '')}" if data.get('used') else '') _add('Temperature', f"{data.get('value')}C" if 'temp' in event_type else '') # ── Storage ── elif group == 'storage': if 'disk_space' in event_type: _add('Mount Point', data.get('mount'), 'code') _add('Usage', f"{data.get('used')}%", 'bold') _add('Available', data.get('available')) elif 'io_error' in event_type: _add('Device', data.get('device'), 'code') _add('Severity', data.get('severity', ''), 'severity') elif 'unavailable' in event_type: _add('Storage Name', data.get('storage_name'), 'bold') _add('Type', data.get('storage_type'), 'code') reason = data.get('reason', '') if reason and len(reason) <= 80: _add('Details', reason) # ── Network ── elif group == 'network': _add('Interface', data.get('interface'), 'code') _add('Latency', f"{data.get('value')}ms" if data.get('value') else '') _add('Threshold', f"{data.get('threshold')}ms" if data.get('threshold') else '') reason = data.get('reason', '') if reason and len(reason) <= 80: _add('Details', reason) # ── Security ── elif group == 'security': _add('Event', event_type.replace('_', ' ').title()) _add('Source IP', data.get('source_ip'), 'code') _add('Username', data.get('username'), 'code') _add('Service', data.get('service')) _add('Jail', data.get('jail'), 'code') _add('Failures', data.get('failures')) _add('Change', data.get('change_details')) # ── Cluster ── elif group == 'cluster': _add('Event', event_type.replace('_', ' ').title()) _add('Node', data.get('node_name'), 'bold') _add('Quorum', data.get('quorum')) _add('Nodes Affected', data.get('entity_list')) # ── Services ── elif group == 'services': _add('Service', data.get('service_name'), 'code') _add('Process', data.get('process'), 'code') _add('Event', event_type.replace('_', ' ').title()) reason = data.get('reason', '') if reason and len(reason) <= 80: _add('Details', reason) # ── Health monitor ── elif group == 'health': _add('Category', data.get('category'), 'bold') _add('Severity', data.get('severity', ''), 'severity') if data.get('original_severity'): _add('Previous Severity', data.get('original_severity'), 'severity') _add('Duration', data.get('duration')) _add('Active Issues', data.get('count')) reason = data.get('reason', '') if reason and len(reason) <= 80: _add('Details', reason) # ── Updates ── elif group == 'updates': _add('Total Updates', data.get('total_count'), 'bold') _add('Security Updates', data.get('security_count')) _add('Proxmox Updates', data.get('pve_count')) _add('Kernel Updates', data.get('kernel_count')) imp = data.get('important_list', '') if imp and imp != 'none': # Render each package on its own line inside a single cell pkg_lines = [l.strip() for l in imp.split('\n') if l.strip()] if pkg_lines: pkg_html = '
'.join( f'{esc(p)}' for p in pkg_lines ) rows.append((esc('Important Packages'), pkg_html)) _add('Current Version', data.get('current_version'), 'code') _add('New Version', data.get('new_version'), 'code') # ── Other / unknown ── else: reason = data.get('reason', '') if reason and len(reason) <= 80: _add('Details', reason) return rows def test(self) -> Tuple[bool, str]: import socket as _socket hostname = _socket.gethostname().split('.')[0] result = self.send( 'ProxMenux Test Notification', 'This is a test notification from ProxMenux Monitor.\n' 'If you received this, your email channel is working correctly.', 'INFO', data={ 'hostname': hostname, '_event_type': 'webhook_test', '_group': 'other', 'reason': 'Email notification channel connectivity verified successfully. ' 'You will receive alerts from ProxMenux Monitor at this address.', } ) return result.get('success', False), result.get('error', '') # ─── Channel Factory ───────────────────────────────────────────── CHANNEL_TYPES = { 'telegram': { 'name': 'Telegram', 'config_keys': ['bot_token', 'chat_id'], 'class': TelegramChannel, }, 'gotify': { 'name': 'Gotify', 'config_keys': ['url', 'token'], 'class': GotifyChannel, }, 'discord': { 'name': 'Discord', 'config_keys': ['webhook_url'], 'class': DiscordChannel, }, 'email': { 'name': 'Email (SMTP)', 'config_keys': ['host', 'port', 'username', 'password', 'tls_mode', 'from_address', 'to_addresses', 'subject_prefix'], 'class': EmailChannel, }, } def create_channel(channel_type: str, config: Dict[str, str]) -> Optional[NotificationChannel]: """Create a channel instance from type name and config dict. Args: channel_type: 'telegram', 'gotify', or 'discord' config: Dict with channel-specific keys (see CHANNEL_TYPES) Returns: Channel instance or None if creation fails """ try: if channel_type == 'telegram': return TelegramChannel( bot_token=config.get('bot_token', ''), chat_id=config.get('chat_id', ''), topic_id=config.get('topic_id', '') ) elif channel_type == 'gotify': return GotifyChannel( server_url=config.get('url', ''), app_token=config.get('token', '') ) elif channel_type == 'discord': return DiscordChannel( webhook_url=config.get('webhook_url', '') ) elif channel_type == 'email': return EmailChannel(config) except Exception as e: print(f"[NotificationChannels] Failed to create {channel_type}: {e}") return None