""" Authentication Manager Module Handles all authentication-related operations including: - Loading/saving auth configuration - Password hashing and verification - JWT token generation and validation - Auth status checking - Two-Factor Authentication (2FA/TOTP) """ import os import json import hashlib import hmac import secrets import base64 from datetime import datetime, timedelta from pathlib import Path try: import jwt JWT_AVAILABLE = True except ImportError: JWT_AVAILABLE = False print("Warning: PyJWT not available. Authentication features will be limited.") try: import pyotp import segno import io import base64 TOTP_AVAILABLE = True except ImportError: TOTP_AVAILABLE = False print("Warning: pyotp/segno not available. 2FA features will be disabled.") # Configuration CONFIG_DIR = Path.home() / ".config" / "proxmenux-monitor" AUTH_CONFIG_FILE = CONFIG_DIR / "auth.json" # Sentinel for legacy installs that started under the hardcoded JWT_SECRET. # The audit (Tier 4 #22) flagged that constant — anyone with access to the # public repo could forge JWTs against any deployment. We now generate a # random per-install secret on first use and persist it in auth.json. Tokens # issued under the legacy secret stop verifying once the migration runs; # users have to log in once. That's intentional and accepted by the audit. _LEGACY_JWT_SECRET = "proxmenux-monitor-secret-key-change-in-production" JWT_ALGORITHM = "HS256" TOKEN_EXPIRATION_HOURS = 24 # Audit Tier 5: bind tokens to issuer/audience so they can't be cross-used # against another deployment / service that happens to share the same # JWT_SECRET. Verified in `verify_token` with a permissive fallback for # tokens issued before the rollout. JWT_ISSUER = "proxmenux-monitor" JWT_AUDIENCE = "api" # Password-hashing format: pbkdf2_sha256 with 600k iterations (OWASP 2023+ # baseline). Uses only stdlib (`hashlib.pbkdf2_hmac`), no external deps. # Format on disk: "pbkdf2_sha256$$$". # Legacy SHA-256 (single-line 64 hex chars) is still recognized for one final # verify and re-hashed on the next successful login (lazy migration). _PWD_PBKDF2_ITERS = 600000 _PWD_PBKDF2_PREFIX = "pbkdf2_sha256$" def ensure_config_dir(): """Ensure the configuration directory exists""" CONFIG_DIR.mkdir(parents=True, exist_ok=True) def load_auth_config(): """ Load authentication configuration from file Returns dict with structure: { "enabled": bool, "username": str, "password_hash": str, "declined": bool, "configured": bool, "totp_enabled": bool, # 2FA enabled flag "totp_secret": str, # TOTP secret key "backup_codes": list, # List of backup codes "api_tokens": list, # List of stored API token metadata "revoked_tokens": list # List of revoked token hashes } """ if not AUTH_CONFIG_FILE.exists(): return { "enabled": False, "username": None, "password_hash": None, "declined": False, "configured": False, "totp_enabled": False, "totp_secret": None, "backup_codes": [], "api_tokens": [], "revoked_tokens": [] } try: with open(AUTH_CONFIG_FILE, 'r') as f: config = json.load(f) # Ensure all required fields exist config.setdefault("declined", False) config.setdefault("configured", config.get("enabled", False) or config.get("declined", False)) config.setdefault("totp_enabled", False) config.setdefault("totp_secret", None) config.setdefault("backup_codes", []) config.setdefault("api_tokens", []) config.setdefault("revoked_tokens", []) return config except Exception as e: print(f"Error loading auth config: {e}") return { "enabled": False, "username": None, "password_hash": None, "declined": False, "configured": False, "totp_enabled": False, "totp_secret": None, "backup_codes": [], "api_tokens": [], "revoked_tokens": [] } def save_auth_config(config): """Save authentication configuration to file""" ensure_config_dir() try: with open(AUTH_CONFIG_FILE, 'w') as f: json.dump(config, f, indent=2) return True except Exception as e: print(f"Error saving auth config: {e}") return False def _get_jwt_secret(): """Return the per-install JWT signing secret, generating one on first use. The secret lives in `auth.json` under the `jwt_secret` key. On a fresh install or when migrating from the legacy hardcoded constant, we mint a new `secrets.token_urlsafe(32)`-derived value and persist it. Once persisted it never changes (rotation would log out every active session). Audit Tier 4 #22. """ config = load_auth_config() sec = config.get("jwt_secret") if isinstance(sec, str) and len(sec) >= 32: return sec new_secret = secrets.token_urlsafe(48) config["jwt_secret"] = new_secret save_auth_config(config) return new_secret # Server-side mirror of the frontend's `validatePasswordStrength`. Defense # in depth: the UI enforces these rules but a direct API caller (curl, # scripted setup, custom client) bypasses the JS — so the same minimum has # to be enforced here. Audit Tier 6 — Política de password débil. _OBVIOUS_PASSWORDS = { "password", "password1", "password123", "12345678", "123456789", "1234567890", "qwerty", "qwertyuiop", "letmein", "welcome", "admin", "administrator", "root", "proxmox", "proxmenux", "changeme", "abcdefgh", } def _validate_password_strength(pw): """Return None if `pw` passes policy, otherwise a human-readable reason.""" if not isinstance(pw, str) or len(pw) < 10: return "Password must be at least 10 characters" categories = sum([ any(c.islower() for c in pw), any(c.isupper() for c in pw), any(c.isdigit() for c in pw), any(not c.isalnum() for c in pw), ]) if categories < 3: return "Password must mix at least 3 of: lowercase, uppercase, digits, symbols" if pw.lower() in _OBVIOUS_PASSWORDS: return "That password is in the common-passwords list — pick something else" return None def hash_password(password): """Hash a password with PBKDF2-HMAC-SHA256. Format: `pbkdf2_sha256$$$`. Per-password 16-byte random salt; 600k iterations (OWASP 2023+ baseline). Stdlib only — no bcrypt / argon2-cffi dependency added to the AppImage build. See audit Tier 4 #23. """ salt = secrets.token_bytes(16) derived = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, _PWD_PBKDF2_ITERS, dklen=32) return ( f"{_PWD_PBKDF2_PREFIX}{_PWD_PBKDF2_ITERS}$" f"{base64.b64encode(salt).decode('ascii')}$" f"{base64.b64encode(derived).decode('ascii')}" ) def _verify_pbkdf2(password, stored): """Verify a PBKDF2 hash. Returns True on match, False on any failure.""" try: # `pbkdf2_sha256$$$` body = stored[len(_PWD_PBKDF2_PREFIX):] iters_str, salt_b64, hash_b64 = body.split('$', 2) iters = int(iters_str) salt = base64.b64decode(salt_b64) expected = base64.b64decode(hash_b64) except Exception: return False derived = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, iters, dklen=len(expected)) return hmac.compare_digest(derived, expected) def _is_legacy_sha256(stored): """True if `stored` looks like the old unsalted SHA-256 hex digest.""" if not isinstance(stored, str): return False if len(stored) != 64: return False return all(c in '0123456789abcdef' for c in stored.lower()) def verify_password(password, password_hash): """Verify a password against its hash. Recognizes both the new PBKDF2 format and the legacy unsalted SHA-256. The legacy path is kept around for one final verify so existing accounts can log in once and trigger a rehash via `_maybe_rehash_password` — see lazy migration in `authenticate()`. """ if not isinstance(password_hash, str) or not password_hash: return False if password_hash.startswith(_PWD_PBKDF2_PREFIX): return _verify_pbkdf2(password, password_hash) if _is_legacy_sha256(password_hash): legacy = hashlib.sha256(password.encode('utf-8')).hexdigest() return hmac.compare_digest(legacy, password_hash) return False def _maybe_rehash_password(password, current_hash): """If the stored hash is legacy SHA-256, return a fresh PBKDF2 hash to persist. Returns None when no rehash is needed (already PBKDF2 or unrecognized). Caller is responsible for saving the new hash back to auth.json. """ if _is_legacy_sha256(current_hash): return hash_password(password) return None def generate_token(username): """Generate a JWT token for the given username""" if not JWT_AVAILABLE: return None payload = { 'username': username, 'exp': datetime.utcnow() + timedelta(hours=TOKEN_EXPIRATION_HOURS), 'iat': datetime.utcnow(), 'iss': JWT_ISSUER, 'aud': JWT_AUDIENCE, } try: token = jwt.encode(payload, _get_jwt_secret(), algorithm=JWT_ALGORITHM) return token except Exception as e: print(f"Error generating token: {e}") return None # In-memory cache for revoked_tokens to avoid hitting disk on every request. # Invalidated by both TTL and the auth.json mtime so a revocation from another # process/restart still propagates within seconds. _REVOKED_CACHE = {'set': None, 'mtime': 0.0, 'fetched_at': 0.0} _REVOKED_TTL = 30.0 def _get_revoked_tokens_cached(): """Return a frozenset of revoked-token hashes, cached for ~30s.""" import time now = time.monotonic() try: mtime = AUTH_CONFIG_FILE.stat().st_mtime except OSError: mtime = 0.0 if ( _REVOKED_CACHE['set'] is not None and now - _REVOKED_CACHE['fetched_at'] < _REVOKED_TTL and mtime == _REVOKED_CACHE['mtime'] ): return _REVOKED_CACHE['set'] config = load_auth_config() revoked = frozenset(config.get("revoked_tokens", [])) _REVOKED_CACHE['set'] = revoked _REVOKED_CACHE['mtime'] = mtime _REVOKED_CACHE['fetched_at'] = now return revoked def _invalidate_revoked_cache(): """Force a re-read on the next verify_token call.""" _REVOKED_CACHE['set'] = None def verify_token_full(token): """Like `verify_token` but also returns the `scope` claim. Returns `(username, scope)` on success, `(None, None)` otherwise. Tokens issued before scope was added (no claim) get `'full_admin'` so legacy sessions keep working unchanged. Audit Tier 6 — Tokens API JWT 365 días sin scope. """ if not JWT_AVAILABLE or not token: return None, None try: token_hash = hashlib.sha256(token.encode()).hexdigest() if token_hash in _get_revoked_tokens_cached(): return None, None try: payload = jwt.decode( token, _get_jwt_secret(), algorithms=[JWT_ALGORITHM], audience=JWT_AUDIENCE, issuer=JWT_ISSUER, ) except (jwt.MissingRequiredClaimError, jwt.InvalidAudienceError, jwt.InvalidIssuerError): payload = jwt.decode(token, _get_jwt_secret(), algorithms=[JWT_ALGORITHM]) return payload.get('username'), payload.get('scope', 'full_admin') except jwt.ExpiredSignatureError: return None, None except jwt.InvalidTokenError: return None, None def verify_token(token): """ Verify a JWT token Returns username if valid, None otherwise Also checks if the token has been revoked """ if not JWT_AVAILABLE or not token: return None try: # Revoked-token list is cached in memory (TTL + mtime) so high-RPS # endpoints don't reread auth.json from disk on every @require_auth call. token_hash = hashlib.sha256(token.encode()).hexdigest() if token_hash in _get_revoked_tokens_cached(): return None # Verify against the per-install secret first. Tokens issued under the # legacy hardcoded secret were forgeable by anyone with read access to # the public repo — those are intentionally rejected so users get a # one-time relogin to mint a fresh token. # `iss`/`aud` claims are validated when present; tokens issued before # the iss/aud rollout (no claims) fall back to a permissive decode so # active sessions don't break on upgrade. try: payload = jwt.decode( token, _get_jwt_secret(), algorithms=[JWT_ALGORITHM], audience=JWT_AUDIENCE, issuer=JWT_ISSUER, ) except (jwt.MissingRequiredClaimError, jwt.InvalidAudienceError, jwt.InvalidIssuerError): payload = jwt.decode(token, _get_jwt_secret(), algorithms=[JWT_ALGORITHM]) return payload.get('username') except jwt.ExpiredSignatureError: print("Token has expired") return None except jwt.InvalidTokenError as e: print(f"Invalid token: {e}") return None def store_api_token_metadata(token, token_name="API Token"): """ Store API token metadata (hash, name, creation date) for listing and revocation. The actual token is never stored - only a hash for identification. """ config = load_auth_config() token_hash = hashlib.sha256(token.encode()).hexdigest() token_id = token_hash[:16] token_entry = { "id": token_id, "name": token_name, "token_hash": token_hash, "token_prefix": token[:12] + "...", "created_at": datetime.utcnow().isoformat() + "Z", "expires_at": (datetime.utcnow() + timedelta(days=365)).isoformat() + "Z" } config.setdefault("api_tokens", []) config["api_tokens"].append(token_entry) save_auth_config(config) return token_entry def list_api_tokens(): """ List all stored API token metadata (no actual tokens are returned). Returns list of token entries with id, name, prefix, creation and expiration dates. """ config = load_auth_config() tokens = config.get("api_tokens", []) revoked = set(config.get("revoked_tokens", [])) result = [] for t in tokens: entry = { "id": t.get("id"), "name": t.get("name", "API Token"), "token_prefix": t.get("token_prefix", "***"), "created_at": t.get("created_at"), "expires_at": t.get("expires_at"), "revoked": t.get("token_hash") in revoked } result.append(entry) return result def revoke_api_token(token_id): """ Revoke an API token by its ID. Adds the token hash to the revoked list so it fails verification. Returns (success: bool, message: str) """ config = load_auth_config() tokens = config.get("api_tokens", []) target = None for t in tokens: if t.get("id") == token_id: target = t break if not target: return False, "Token not found" token_hash = target.get("token_hash") config.setdefault("revoked_tokens", []) if token_hash in config["revoked_tokens"]: return False, "Token is already revoked" config["revoked_tokens"].append(token_hash) # Remove from the active tokens list config["api_tokens"] = [t for t in tokens if t.get("id") != token_id] if save_auth_config(config): _invalidate_revoked_cache() return True, "Token revoked successfully" else: return False, "Failed to save configuration" def get_auth_status(): """ Get current authentication status Returns dict with: { "auth_enabled": bool, "auth_configured": bool, "declined": bool, "username": str or None, "authenticated": bool, "totp_enabled": bool # 2FA status } """ config = load_auth_config() return { "auth_enabled": config.get("enabled", False), "auth_configured": config.get("configured", False), "declined": config.get("declined", False), "username": config.get("username") if config.get("enabled") else None, "authenticated": False, "totp_enabled": config.get("totp_enabled", False) # Include 2FA status } def setup_auth(username, password): """ Set up authentication with username and password Returns (success: bool, message: str) """ # Refuse if auth has already been configured. Without this guard an # unauthenticated POST to /api/auth/setup would let an attacker overwrite # the existing admin credentials and take over the account. See audit # Tier 1 #4. existing = load_auth_config() if existing.get("configured", False): return False, "Authentication is already configured" if not username or not password: return False, "Username and password are required" pw_err = _validate_password_strength(password) if pw_err: return False, pw_err config = { "enabled": True, "username": username, "password_hash": hash_password(password), "declined": False, "configured": True, "totp_enabled": False, "totp_secret": None, "backup_codes": [] } if save_auth_config(config): return True, "Authentication configured successfully" else: return False, "Failed to save authentication configuration" def decline_auth(): """ Mark authentication as declined by user Returns (success: bool, message: str) """ config = load_auth_config() config["enabled"] = False config["declined"] = True config["configured"] = True config["username"] = None config["password_hash"] = None config["totp_enabled"] = False config["totp_secret"] = None config["backup_codes"] = [] if save_auth_config(config): return True, "Authentication declined" else: return False, "Failed to save configuration" def disable_auth(): """ Disable authentication (different from decline - can be re-enabled) Returns (success: bool, message: str) """ config = load_auth_config() config["enabled"] = False config["username"] = None config["password_hash"] = None config["declined"] = False config["configured"] = False config["totp_enabled"] = False config["totp_secret"] = None config["backup_codes"] = [] # Intentionally preserve `api_tokens` and `revoked_tokens` across # disable→re-enable cycles. Wiping them allowed a previously revoked # token to verify again because nothing on the deny-list would reject # it. Audit Tier 5 — disable_auth() borra revoked_tokens. _invalidate_revoked_cache() if save_auth_config(config): return True, "Authentication disabled" else: return False, "Failed to save configuration" def enable_auth(): """ Enable authentication (must already be configured) Returns (success: bool, message: str) """ config = load_auth_config() if not config.get("username") or not config.get("password_hash"): return False, "Authentication not configured. Please set up username and password first." config["enabled"] = True config["declined"] = False if save_auth_config(config): return True, "Authentication enabled" else: return False, "Failed to save configuration" def change_password(old_password, new_password, totp_code=None): """ Change the authentication password. When 2FA is enabled on the account, a valid TOTP code (or backup code) is REQUIRED in addition to the current password — otherwise an attacker who obtained the password (e.g. via shoulder-surfing or phishing) could rotate it without the second factor and lock the legitimate user out. See audit Tier 1 #10. Returns (success: bool, message: str). """ config = load_auth_config() if not config.get("enabled"): return False, "Authentication is not enabled" if not verify_password(old_password, config.get("password_hash", "")): return False, "Current password is incorrect" pw_err = _validate_password_strength(new_password) if pw_err: return False, f"New {pw_err[0].lower()}{pw_err[1:]}" # 2FA gate: if the account has TOTP enabled, the caller must prove they # also hold the second factor. if config.get("totp_enabled"): username = config.get("username") if not totp_code: return False, "2FA code required to change password" # Try TOTP first, then fall back to backup code (same UX as login). ok, _ = verify_totp(username, totp_code, use_backup=False) if not ok: ok, _ = verify_totp(username, totp_code, use_backup=True) if not ok: return False, "Invalid 2FA code" # Reload after possible backup-code consumption inside verify_totp. config = load_auth_config() config["password_hash"] = hash_password(new_password) if save_auth_config(config): return True, "Password changed successfully" else: return False, "Failed to save new password" def generate_totp_secret(): """Generate a new TOTP secret key""" if not TOTP_AVAILABLE: return None return pyotp.random_base32() def generate_totp_qr(username, secret): """ Generate a QR code for TOTP setup Returns base64 encoded SVG image """ if not TOTP_AVAILABLE: return None try: # Create TOTP URI totp = pyotp.TOTP(secret) uri = totp.provisioning_uri( name=username, issuer_name="ProxMenux Monitor" ) qr = segno.make(uri) # Convert to SVG string buffer = io.BytesIO() qr.save(buffer, kind='svg', scale=4, border=2) svg_bytes = buffer.getvalue() svg_content = svg_bytes.decode('utf-8') # Return as data URL svg_base64 = base64.b64encode(svg_content.encode()).decode('utf-8') return f"data:image/svg+xml;base64,{svg_base64}" except Exception as e: print(f"Error generating QR code: {e}") return None def generate_backup_codes(count=8): """Generate backup codes for 2FA recovery""" codes = [] for _ in range(count): # Generate 8-character alphanumeric code code = ''.join(secrets.choice('ABCDEFGHJKLMNPQRSTUVWXYZ23456789') for _ in range(8)) # Format as XXXX-XXXX for readability formatted = f"{code[:4]}-{code[4:]}" codes.append({ "code": hashlib.sha256(formatted.encode()).hexdigest(), "used": False }) return codes def setup_totp(username): """ Set up TOTP for a user Returns (success: bool, secret: str, qr_code: str, backup_codes: list, message: str) """ if not TOTP_AVAILABLE: return False, None, None, None, "2FA is not available (pyotp/segno not installed)" config = load_auth_config() if not config.get("enabled"): return False, None, None, None, "Authentication must be enabled first" if config.get("username") != username: return False, None, None, None, "Invalid username" # Generate new secret and backup codes secret = generate_totp_secret() qr_code = generate_totp_qr(username, secret) backup_codes_plain = [] backup_codes_hashed = generate_backup_codes() # Generate plain text backup codes for display (only returned once) for i in range(8): code = ''.join(secrets.choice('ABCDEFGHJKLMNPQRSTUVWXYZ23456789') for _ in range(8)) formatted = f"{code[:4]}-{code[4:]}" backup_codes_plain.append(formatted) backup_codes_hashed[i]["code"] = hashlib.sha256(formatted.encode()).hexdigest() # Store secret and hashed backup codes (not enabled yet until verified) config["totp_secret"] = secret config["backup_codes"] = backup_codes_hashed if save_auth_config(config): return True, secret, qr_code, backup_codes_plain, "2FA setup initiated" else: return False, None, None, None, "Failed to save 2FA configuration" def verify_totp(username, token, use_backup=False): """ Verify a TOTP token or backup code Returns (success: bool, message: str) """ if not TOTP_AVAILABLE and not use_backup: return False, "2FA is not available" config = load_auth_config() if not config.get("totp_enabled"): return False, "2FA is not enabled" if config.get("username") != username: return False, "Invalid username" # Check backup code if use_backup: token_hash = hashlib.sha256(token.encode()).hexdigest() for backup_code in config.get("backup_codes", []): if backup_code["code"] == token_hash and not backup_code["used"]: backup_code["used"] = True save_auth_config(config) return True, "Backup code accepted" return False, "Invalid or already used backup code" # Check TOTP token. `valid_window=1` accepts the previous, current and # next 30s timesteps, which is friendly to clock skew but lets a leaked # OTP be replayed for up to ~90s. Track the last successfully-used # timestep counter per account and reject anything <= that. import time as _time totp = pyotp.TOTP(config.get("totp_secret")) if not totp.verify(token, valid_window=1): return False, "Invalid 2FA code" # Find which counter the OTP corresponds to (one of current ± 1). interval = getattr(totp, 'interval', 30) current_counter = int(_time.time() // interval) matched_counter = None for c in (current_counter - 1, current_counter, current_counter + 1): try: if totp.at(c) == token: matched_counter = c break except Exception: continue if matched_counter is None: # `verify()` succeeded but we couldn't map to a counter — fail closed. return False, "Invalid 2FA code" last_counter = config.get("last_totp_counter", -1) if matched_counter <= last_counter: return False, "2FA code already used; wait for the next one" config["last_totp_counter"] = matched_counter save_auth_config(config) return True, "2FA verification successful" def enable_totp(username, verification_token): """ Enable TOTP after successful verification Returns (success: bool, message: str) """ if not TOTP_AVAILABLE: return False, "2FA is not available" config = load_auth_config() if not config.get("totp_secret"): return False, "2FA has not been set up. Please set up 2FA first." if config.get("username") != username: return False, "Invalid username" # Verify the token before enabling totp = pyotp.TOTP(config.get("totp_secret")) if not totp.verify(verification_token, valid_window=1): return False, "Invalid verification code. Please try again." config["totp_enabled"] = True if save_auth_config(config): return True, "2FA enabled successfully" else: return False, "Failed to enable 2FA" def disable_totp(username, password, totp_code=None): """ Disable TOTP (requires password confirmation AND a valid 2FA code). Previously this endpoint only required the password, which meant an attacker who phished or replayed the password could turn off the user's second factor entirely. Per audit Tier 1 #10 and the related frontend finding ("Disable 2FA solo password"), we now also demand a valid TOTP code (or backup code) to disable the protection it represents. Returns (success: bool, message: str). """ config = load_auth_config() if config.get("username") != username: return False, "Invalid username" if not verify_password(password, config.get("password_hash", "")): return False, "Invalid password" # If TOTP is currently active, require the second factor to disable it. if config.get("totp_enabled"): if not totp_code: return False, "2FA code required to disable 2FA" ok, _ = verify_totp(username, totp_code, use_backup=False) if not ok: ok, _ = verify_totp(username, totp_code, use_backup=True) if not ok: return False, "Invalid 2FA code" # Reload in case a backup code was consumed. config = load_auth_config() config["totp_enabled"] = False config["totp_secret"] = None config["backup_codes"] = [] if save_auth_config(config): return True, "2FA disabled successfully" else: return False, "Failed to disable 2FA" # ------------------------------------------------------------------- # SSL/HTTPS Certificate Management # ------------------------------------------------------------------- SSL_CONFIG_FILE = Path(os.environ.get("PROXMENUX_SSL_CONFIG", "/etc/proxmenux/ssl_config.json")) # Default Proxmox certificate paths PROXMOX_CERT_PATH = "/etc/pve/local/pve-ssl.pem" PROXMOX_KEY_PATH = "/etc/pve/local/pve-ssl.key" # When the admin uploads a custom certificate via the PVE UI, it's written # to `pveproxy-ssl.pem` instead and PVE itself prefers it. We do the same so # `detect_proxmox_certificates` reflects the cert the user actually wants # served. Issue #181. PROXMOX_CUSTOM_CERT_PATH = "/etc/pve/local/pveproxy-ssl.pem" PROXMOX_CUSTOM_KEY_PATH = "/etc/pve/local/pveproxy-ssl.key" def load_ssl_config(): """Load SSL configuration from file""" if not SSL_CONFIG_FILE.exists(): return { "enabled": False, "cert_path": "", "key_path": "", "source": "none" # "none", "proxmox", "custom" } try: with open(SSL_CONFIG_FILE, 'r') as f: config = json.load(f) config.setdefault("enabled", False) config.setdefault("cert_path", "") config.setdefault("key_path", "") config.setdefault("source", "none") return config except Exception: return { "enabled": False, "cert_path": "", "key_path": "", "source": "none" } def save_ssl_config(config): """Save SSL configuration to file""" try: SSL_CONFIG_FILE.parent.mkdir(parents=True, exist_ok=True) with open(SSL_CONFIG_FILE, 'w') as f: json.dump(config, f, indent=2) return True except Exception as e: print(f"Error saving SSL config: {e}") return False def detect_proxmox_certificates(): """ Detect available Proxmox certificates. Returns dict with detection results. Prefers the custom-uploaded `pveproxy-ssl.pem` (what PVE itself uses when the admin uploaded a Let's Encrypt / commercial cert via the UI) and falls back to the default self-signed `pve-ssl.pem`. Issue #181 — detector solo encontraba pve-ssl.pem. """ result = { "proxmox_available": False, "proxmox_cert": PROXMOX_CERT_PATH, "proxmox_key": PROXMOX_KEY_PATH, "cert_info": None } if os.path.isfile(PROXMOX_CUSTOM_CERT_PATH) and os.path.isfile(PROXMOX_CUSTOM_KEY_PATH): result["proxmox_cert"] = PROXMOX_CUSTOM_CERT_PATH result["proxmox_key"] = PROXMOX_CUSTOM_KEY_PATH result["proxmox_available"] = True elif os.path.isfile(PROXMOX_CERT_PATH) and os.path.isfile(PROXMOX_KEY_PATH): result["proxmox_available"] = True if result["proxmox_available"]: # Try to get certificate info from whichever cert we picked. try: import subprocess cert_output = subprocess.run( ["openssl", "x509", "-in", result["proxmox_cert"], "-noout", "-subject", "-enddate", "-issuer"], capture_output=True, text=True, timeout=5 ) if cert_output.returncode == 0: lines = cert_output.stdout.strip().split('\n') info = {} for line in lines: if line.startswith("subject="): info["subject"] = line.replace("subject=", "").strip() elif line.startswith("notAfter="): info["expires"] = line.replace("notAfter=", "").strip() elif line.startswith("issuer="): issuer = line.replace("issuer=", "").strip() info["issuer"] = issuer info["is_self_signed"] = info.get("subject", "") == issuer result["cert_info"] = info except Exception: pass return result def validate_certificate_files(cert_path, key_path): """ Validate that cert and key files exist and are readable. Returns (valid: bool, message: str) """ if not cert_path or not key_path: return False, "Certificate and key paths are required" if not os.path.isfile(cert_path): return False, f"Certificate file not found: {cert_path}" if not os.path.isfile(key_path): return False, f"Key file not found: {key_path}" # Verify files are readable try: with open(cert_path, 'r') as f: content = f.read(100) if "BEGIN CERTIFICATE" not in content and "BEGIN TRUSTED CERTIFICATE" not in content: return False, "Certificate file does not appear to be a valid PEM certificate" with open(key_path, 'r') as f: content = f.read(100) if "BEGIN" not in content or "KEY" not in content: return False, "Key file does not appear to be a valid PEM key" except PermissionError: return False, "Cannot read certificate files. Check file permissions." except Exception as e: return False, f"Error reading certificate files: {str(e)}" # Verify cert and key match try: import subprocess cert_mod = subprocess.run( ["openssl", "x509", "-noout", "-modulus", "-in", cert_path], capture_output=True, text=True, timeout=5 ) key_mod = subprocess.run( ["openssl", "rsa", "-noout", "-modulus", "-in", key_path], capture_output=True, text=True, timeout=5 ) if cert_mod.returncode == 0 and key_mod.returncode == 0: if cert_mod.stdout.strip() != key_mod.stdout.strip(): return False, "Certificate and key do not match" except Exception: pass # Non-critical, proceed anyway return True, "Certificate files are valid" def configure_ssl(cert_path, key_path, source="custom"): """ Configure SSL with given certificate and key paths. Returns (success: bool, message: str) """ valid, message = validate_certificate_files(cert_path, key_path) if not valid: return False, message config = { "enabled": True, "cert_path": cert_path, "key_path": key_path, "source": source } if save_ssl_config(config): return True, "SSL configured successfully. Restart the monitor service to apply changes." else: return False, "Failed to save SSL configuration" def disable_ssl(): """Disable SSL and return to HTTP""" config = { "enabled": False, "cert_path": "", "key_path": "", "source": "none" } if save_ssl_config(config): return True, "SSL disabled. Restart the monitor service to apply changes." else: return False, "Failed to save SSL configuration" def get_ssl_context(): """ Get SSL context for Flask if SSL is configured and enabled. Returns tuple (cert_path, key_path) or None """ config = load_ssl_config() if not config.get("enabled"): return None cert_path = config.get("cert_path", "") key_path = config.get("key_path", "") if cert_path and key_path and os.path.isfile(cert_path) and os.path.isfile(key_path): return (cert_path, key_path) return None def authenticate(username, password, totp_token=None): """ Authenticate a user with username, password, and optional TOTP Returns (success: bool, token: str or None, requires_totp: bool, message: str) """ config = load_auth_config() if not config.get("enabled"): return False, None, False, "Authentication is not enabled" if username != config.get("username"): return False, None, False, "Invalid username or password" if not verify_password(password, config.get("password_hash", "")): return False, None, False, "Invalid username or password" # Lazy migration: if the stored hash is the legacy unsalted SHA-256, replace # it with a fresh PBKDF2 hash now that we have the cleartext in hand. The # next login uses the new hash; the legacy code path stays around only as # the recognition entry in `verify_password`. Audit Tier 4 #23. upgraded = _maybe_rehash_password(password, config.get("password_hash", "")) if upgraded: config["password_hash"] = upgraded try: save_auth_config(config) except Exception as e: # Don't block login if persistence fails — the user is still # authenticated and we can rehash on a future login attempt. print(f"[auth] Failed to persist rehashed password: {e}") if config.get("totp_enabled"): if not totp_token: # First step: password OK, now request TOTP code (not a failure) return False, None, True, "2FA code required" # Verify TOTP token or backup code success, message = verify_totp(username, totp_token, use_backup=len(totp_token) == 9) # Backup codes are formatted XXXX-XXXX if not success: # TOTP code is wrong: return requires_totp=False so the caller # logs it as a real authentication failure for Fail2Ban return False, None, False, "Invalid 2FA code" token = generate_token(username) if token: return True, token, False, "Authentication successful" else: return False, None, False, "Failed to generate authentication token"