From ac8f06c3a249fbdf351a94b8772bc69ac8796564 Mon Sep 17 00:00:00 2001 From: MacRimi Date: Tue, 17 Mar 2026 15:04:29 +0100 Subject: [PATCH] Update notification_manager.py --- AppImage/scripts/notification_manager.py | 61 +++++++++++++++++++++++- 1 file changed, 60 insertions(+), 1 deletion(-) diff --git a/AppImage/scripts/notification_manager.py b/AppImage/scripts/notification_manager.py index f3447b41..c03253ae 100644 --- a/AppImage/scripts/notification_manager.py +++ b/AppImage/scripts/notification_manager.py @@ -23,6 +23,7 @@ import time import socket import sqlite3 import threading +import base64 from queue import Queue, Empty from datetime import datetime from typing import Dict, Any, List, Optional @@ -48,6 +49,55 @@ from notification_events import ( DB_PATH = Path('/usr/local/share/proxmenux/health_monitor.db') SETTINGS_PREFIX = 'notification.' +ENCRYPTION_KEY_FILE = Path('/usr/local/share/proxmenux/.notification_key') + +# Keys that contain sensitive data and should be encrypted +SENSITIVE_KEYS = {'ai_api_key', 'telegram.token', 'gotify.token', 'discord.webhook_url', 'email.password'} + + +# ─── Encryption for Sensitive Data ─────────────────────────────── + +def _get_or_create_encryption_key() -> bytes: + """Get or create the encryption key for sensitive notification data.""" + if ENCRYPTION_KEY_FILE.exists(): + with open(ENCRYPTION_KEY_FILE, 'rb') as f: + return f.read() + + # Create new random key + import secrets + key = secrets.token_bytes(32) + ENCRYPTION_KEY_FILE.parent.mkdir(parents=True, exist_ok=True) + with open(ENCRYPTION_KEY_FILE, 'wb') as f: + f.write(key) + os.chmod(ENCRYPTION_KEY_FILE, 0o600) # Only root can read + return key + + +def encrypt_sensitive_value(value: str) -> str: + """Encrypt a sensitive value using XOR. Returns base64 with 'ENC:' prefix.""" + if not value or value.startswith('ENC:'): + return value + + key = _get_or_create_encryption_key() + value_bytes = value.encode() + encrypted = bytes(v ^ key[i % len(key)] for i, v in enumerate(value_bytes)) + return "ENC:" + base64.b64encode(encrypted).decode() + + +def decrypt_sensitive_value(encrypted: str) -> str: + """Decrypt a sensitive value.""" + if not encrypted or not encrypted.startswith('ENC:'): + return encrypted + + try: + encrypted_data = encrypted[4:] + key = _get_or_create_encryption_key() + encrypted_bytes = base64.b64decode(encrypted_data) + decrypted = bytes(v ^ key[i % len(key)] for i, v in enumerate(encrypted_bytes)) + return decrypted.decode() + except Exception as e: + print(f"[NotificationManager] Failed to decrypt value: {e}") + return encrypted # Cooldown defaults (seconds) DEFAULT_COOLDOWNS = { @@ -379,6 +429,9 @@ class NotificationManager: for key, value in cursor.fetchall(): # Strip prefix for internal use short_key = key[len(SETTINGS_PREFIX):] + # Decrypt sensitive values + if short_key in SENSITIVE_KEYS and value and value.startswith('ENC:'): + value = decrypt_sensitive_value(value) self._config[short_key] = value conn.close() except Exception as e: @@ -1351,11 +1404,17 @@ class NotificationManager: full_key = key if key.startswith(SETTINGS_PREFIX) else f'{SETTINGS_PREFIX}{key}' short_key = full_key[len(SETTINGS_PREFIX):] + # Encrypt sensitive values before storing + store_value = str(value) + if short_key in SENSITIVE_KEYS and store_value and not store_value.startswith('ENC:'): + store_value = encrypt_sensitive_value(store_value) + cursor.execute(''' INSERT OR REPLACE INTO user_settings (setting_key, setting_value, updated_at) VALUES (?, ?, ?) - ''', (full_key, str(value), now)) + ''', (full_key, store_value, now)) + # Keep decrypted value in memory for runtime use self._config[short_key] = str(value) # If user is explicitly enabling an event that defaults to disabled,