Update notification_manager.py

This commit is contained in:
MacRimi
2026-03-17 15:04:29 +01:00
parent feaf7b8abd
commit ac8f06c3a2

View File

@@ -23,6 +23,7 @@ import time
import socket
import sqlite3
import threading
import base64
from queue import Queue, Empty
from datetime import datetime
from typing import Dict, Any, List, Optional
@@ -48,6 +49,55 @@ from notification_events import (
DB_PATH = Path('/usr/local/share/proxmenux/health_monitor.db')
SETTINGS_PREFIX = 'notification.'
ENCRYPTION_KEY_FILE = Path('/usr/local/share/proxmenux/.notification_key')
# Keys that contain sensitive data and should be encrypted
SENSITIVE_KEYS = {'ai_api_key', 'telegram.token', 'gotify.token', 'discord.webhook_url', 'email.password'}
# ─── Encryption for Sensitive Data ───────────────────────────────
def _get_or_create_encryption_key() -> bytes:
"""Get or create the encryption key for sensitive notification data."""
if ENCRYPTION_KEY_FILE.exists():
with open(ENCRYPTION_KEY_FILE, 'rb') as f:
return f.read()
# Create new random key
import secrets
key = secrets.token_bytes(32)
ENCRYPTION_KEY_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(ENCRYPTION_KEY_FILE, 'wb') as f:
f.write(key)
os.chmod(ENCRYPTION_KEY_FILE, 0o600) # Only root can read
return key
def encrypt_sensitive_value(value: str) -> str:
"""Encrypt a sensitive value using XOR. Returns base64 with 'ENC:' prefix."""
if not value or value.startswith('ENC:'):
return value
key = _get_or_create_encryption_key()
value_bytes = value.encode()
encrypted = bytes(v ^ key[i % len(key)] for i, v in enumerate(value_bytes))
return "ENC:" + base64.b64encode(encrypted).decode()
def decrypt_sensitive_value(encrypted: str) -> str:
"""Decrypt a sensitive value."""
if not encrypted or not encrypted.startswith('ENC:'):
return encrypted
try:
encrypted_data = encrypted[4:]
key = _get_or_create_encryption_key()
encrypted_bytes = base64.b64decode(encrypted_data)
decrypted = bytes(v ^ key[i % len(key)] for i, v in enumerate(encrypted_bytes))
return decrypted.decode()
except Exception as e:
print(f"[NotificationManager] Failed to decrypt value: {e}")
return encrypted
# Cooldown defaults (seconds)
DEFAULT_COOLDOWNS = {
@@ -379,6 +429,9 @@ class NotificationManager:
for key, value in cursor.fetchall():
# Strip prefix for internal use
short_key = key[len(SETTINGS_PREFIX):]
# Decrypt sensitive values
if short_key in SENSITIVE_KEYS and value and value.startswith('ENC:'):
value = decrypt_sensitive_value(value)
self._config[short_key] = value
conn.close()
except Exception as e:
@@ -1351,11 +1404,17 @@ class NotificationManager:
full_key = key if key.startswith(SETTINGS_PREFIX) else f'{SETTINGS_PREFIX}{key}'
short_key = full_key[len(SETTINGS_PREFIX):]
# Encrypt sensitive values before storing
store_value = str(value)
if short_key in SENSITIVE_KEYS and store_value and not store_value.startswith('ENC:'):
store_value = encrypt_sensitive_value(store_value)
cursor.execute('''
INSERT OR REPLACE INTO user_settings (setting_key, setting_value, updated_at)
VALUES (?, ?, ?)
''', (full_key, str(value), now))
''', (full_key, store_value, now))
# Keep decrypted value in memory for runtime use
self._config[short_key] = str(value)
# If user is explicitly enabling an event that defaults to disabled,